ATLAS Offline Software
Loading...
Searching...
No Matches
trigbs_extractStream Namespace Reference

Functions

 peb_writer ()

Detailed Description

Select events for a given stream name from an input file and write them to an output file.
The output file obeys the conventions used by the SFO at P1.

Multiple files can be processed but all events need to be from the same run.

Function Documentation

◆ peb_writer()

trigbs_extractStream.peb_writer ( )
Runs the splitting routines

Definition at line 21 of file trigbs_extractStream.py.

21def peb_writer():
22 """Runs the splitting routines"""
23
24 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
25
26 parser.add_argument("files", metavar="FILE", nargs='+',
27 help="RAW file to inspect")
28
29 parser.add_argument('-s', '--stream-name', metavar='NAME', type=str, required=True,
30 help='Name(s) of stream(s) which should be written out, e.g. "stream1,stream2,stream3"')
31
32 parser.add_argument('-o', '--output-name', metavar='NAME', type=str,
33 help='Core output file name (by default derived from input)')
34
35 parser.add_argument('-d', '--output-dir', metavar='DIR', type=str, default='.',
36 help='Directory in which the output file should be written')
37
38 parser.add_argument('-a', '--start-event', metavar='N', type=int, default=0,
39 help='Number of events which should be skipped from the begin')
40
41 parser.add_argument('-n', '--max-events', metavar='N', type=int,
42 help='Maximum number of events in the output file')
43
44 parser.add_argument('-u', '--uncompressed', action='store_true',
45 help='Write out uncompressed data')
46
47 parser.add_argument('-p', '--project-tag', metavar='TAG', type=str,
48 help='Project tag which should be used for the output file')
49
50 parser.add_argument('-l', '--lumi-block', metavar='N', type=int,
51 help='Lumiblock number used for the output file. Use 0 if multiple LBs in file.')
52
53 parser.add_argument('-m', '--hlt-only', metavar='ID', type=int,
54 help='Drop all detector data and write out only HLT data for the given module ID. '
55 'Module ID <0 is a wildcard for all HLT module IDs.')
56
57 parser.add_argument('-v', '--verbosity', metavar='N', type=int, default=logging.INFO,
58 help='Log verbosity')
59
60 parser.add_argument('-P', '--progress-bar', action='store_true',
61 help='Show progress bar when running interactively')
62
63 args = parser.parse_args()
64
65 # global defaults
66 logging.getLogger('').name = 'trigbs_extractStream'
67 logging.getLogger('').setLevel(args.verbosity)
68
69 # input data stream
70 stream = eformat.istream(args.files)
71
72 # get metadata from inputfile
73 dr = eformat.EventStorage.pickDataReader(args.files[0])
74
75 # interpret input file name
76 df = eformat.EventStorage.RawFileName(args.files[0])
77
78 # extract some parameters from meta-data
79 projectTag = args.project_tag or dr.projectTag()
80 lumiBlockNumber = args.lumi_block if args.lumi_block is not None else dr.lumiblockNumber()
81 applicationName = dr.appName()
82 streamType = 'unknown' # the real stream type will be extracted from the matching stream tag
83 if df.hasValidCore() :
84 productionStep = df.productionStep()
85 else:
86 productionStep = 'unknown'
87
88 # input parameters for building the output file name
89 runNumber = dr.runNumber()
90 outputDirectory = args.output_dir
91 streamName = args.stream_name
92
93 # check if multiple streams should be written to the same output file (used in debug recovery)
94 streamNames_out = streamName.split(',')
95 if len(streamNames_out) > 1:
96 streamName = 'accepted'
97
98 if lumiBlockNumber==0:
99 productionStep = 'merge'
100
101 # check the output directory if it exists
102 if (not os.path.exists(outputDirectory)) or (not os.path.isdir(outputDirectory)):
103 logging.fatal(' Output directory %s does not exist ' % outputDirectory)
104 sys.exit(1)
105
106 # event counters
107 totalEvents_in = 0
108 totalEvents_out = 0
109 totalEvents_skipped = 0
110
111 # Loop over events
112 for e in stream:
113
114 if args.max_events and totalEvents_in >= args.max_events:
115 logging.info(' Maximum number of events reached : %d', args.max_events)
116 break
117
118 totalEvents_in += 1
119
120 # select events
121 if args.start_event > 0:
122 args.start_event -= 1
123 totalEvents_skipped += 1
124 continue
125
126 # find StreamTags and see if there is a match
127 streamTags = e.stream_tag()
128 logging.debug(' === New Event nr = %s (Run,Global ID) = (%d,%d) === ', totalEvents_in,e.run_no(),e.global_id())
129 #count accepted streams for each event
130 streamAccepted = 0
131 for tag in streamTags:
132 if tag.name in streamNames_out:
133 #avoid duplication of events that have > 1 streamNames_out
134 if streamAccepted : continue
135 streamAccepted += 1
136
137 # the event should be written out
138 logging.debug(' Matching event found for stream tag = %s', tag)
139 logging.debug(' Stream Tag:Robs = %s', [hex(r) for r in tag.robs])
140 logging.debug(' Stream Tag:Dets = %s', [hex(d) for d in tag.dets])
141
142 # check the lumi block number from the event against the lumi block number defined for the file
143 # this check is only done if the lumi block number for the file is different from 0
144 if lumiBlockNumber > 0:
145 if e.lumi_block() != lumiBlockNumber:
146 logging.error(' Event (Run,Global ID) = (%d,%d) has a lumi block number %d,'
147 ' which is different from LB = %d for the output file. Event skipped.',
148 e.run_no(),e.global_id(),e.lumi_block(),lumiBlockNumber)
149 continue
150
151 # check that all events have the same run number as the output file indicates otherwise skip event
152 if e.run_no() != runNumber:
153 logging.error(' Event (Run,Global ID) = (%d,%d) has a run number,'
154 ' which is different from the run number = %d for the output file. Event skipped.',
155 e.run_no(),e.global_id(),runNumber)
156 continue
157
158 # set the overall tag type for the first match
159 if streamType != tag.type:
160 streamType = tag.type
161 logging.debug(' streamType set to = %s', streamType)
162
163 # create the RAW output file name
164 outRawFile = args.output_name or \
165 eformat.EventStorage.RawFileName(projectTag,
166 runNumber,
167 streamType,
168 streamName,
169 lumiBlockNumber,
170 applicationName,
171 productionStep).fileNameCore()
172 logging.debug(' set output file name = %s', outRawFile)
173
174 # Note: EventStorage and eformat compression enums have different values
175 compressionTypeES = CompressionType.NONE if args.uncompressed else CompressionType.ZLIB
176 compressionType = eformat.helper.Compression.UNCOMPRESSED if args.uncompressed \
177 else eformat.helper.Compression.ZLIB
178 compressionLevel = 0 if args.uncompressed else 1
179
180 # create the output stream
181 ostream = eformat.ostream(directory=outputDirectory,
182 core_name=outRawFile,
183 run_number=dr.runNumber(),
184 trigger_type=dr.triggerType(),
185 detector_mask=dr.detectorMask(),
186 beam_type=dr.beamType(),
187 beam_energy=dr.beamEnergy(),
188 compression=compressionTypeES,
189 complevel=compressionLevel)
190
191 # decide what to write out
192 is_feb_tag = (len(tag.robs)==0 and len(tag.dets)==0)
193 if is_feb_tag and not args.hlt_only:
194 # write out the full event fragment
195 pbev = eformat.write.FullEventFragment(e)
196 logging.debug(' Write full event fragment ')
197 else:
198 # filter stream tag robs and dets for the hlt-only option
199 dets = []
200 robs = []
201 if args.hlt_only:
202 if args.hlt_only < 0:
203 dets = [SubDetector.TDAQ_HLT] if SubDetector.TDAQ_HLT in tag.dets or is_feb_tag else []
204 robs = [robid for robid in tag.robs if SourceIdentifier(robid).subdetector_id()==SubDetector.TDAQ_HLT]
205 else:
206 requested_rob_id = int(SourceIdentifier(SubDetector.TDAQ_HLT, args.hlt_only))
207 if SubDetector.TDAQ_HLT in tag.dets or requested_rob_id in tag.robs or is_feb_tag:
208 robs = [requested_rob_id]
209 else:
210 dets = list(tag.dets)
211 robs = list(tag.robs)
212
213 # select ROBs to write out
214 rob_output_list = []
215 logging.debug(' Write partial event fragment ')
216 for rob in e:
217 if rob.source_id().code() in robs:
218 rob_output_list.append(rob)
219 if rob.source_id().subdetector_id() in dets:
220 rob_output_list.append(rob)
221 # write out the partial event fragment
222 pbev = eformat.write.FullEventFragment()
223 pbev.copy_header(e)
224 for out_rob in rob_output_list:
225 pbev.append_unchecked(out_rob)
226
227 # put the event onto the output stream
228 pbev.compression_type(compressionType)
229 pbev.compression_level(compressionLevel)
230 ostream.write(pbev)
231 if (logging.getLogger('').getEffectiveLevel() > logging.DEBUG) and args.progress_bar:
232 sys.stdout.write('.')
233 sys.stdout.flush()
234
235 # increase output event counter
236 totalEvents_out += 1
237
238 # print final statistics
239 logging.info('Input file(s) = %s ', args.files)
240 logging.info('Total number of events processed = %d ', totalEvents_in)
241 logging.info('Number of events skipped at the beginning = %d ', totalEvents_skipped)
242 logging.info('Number of events written to output file = %d ', totalEvents_out)
243 if totalEvents_out > 0:
244 logging.info('Output file = %s ', ostream.last_filename())
245 else:
246 logging.error('No events selected so no output file created')
247 sys.exit(1)
248
249 sys.exit(0)
250
251