ATLAS Offline Software
Loading...
Searching...
No Matches
trigbs_extractStream.py
Go to the documentation of this file.
1#!/usr/bin/env tdaq_python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4
5# select events for a given stream name from an input file and write them in an outfile
6# the output file obeys the conventions used by the SFO in P1
7
8import sys
9import os
10
12 """Runs the splitting routines"""
13
14 import eformat, logging
15 import EventApps.myopt as myopt
16 from libpyevent_storage import CompressionType
17 from libpyeformat_helper import SourceIdentifier, SubDetector
18
19 option = {}
20
21 # run mode options
22 option['start-event'] = {'short': 'a', 'arg': True,
23 'default': 0,
24 'group': 'Run mode',
25 'description': 'Number of events which should be skipped from the begin'}
26
27 option['max-events'] = {'short': 'n', 'arg': True,
28 'default': 0,
29 'group': 'Run mode',
30 'description': 'Maximum number of events in the output file. 0 means, all useful events from the input.'}
31
32 option['verbosity'] = {'short': 'v', 'arg': True,
33 'default': logging.INFO,
34 'group': 'Run mode',
35 'description': 'Log verbosity'}
36
37 option['progress-bar'] = {'short': 'P', 'arg': False,
38 'default': None,
39 'group': 'Run mode',
40 'description': 'Show progress bar when running interactively'}
41
42 option['output-dir'] = {'short': 'd', 'arg': True,
43 'default': '.',
44 'group': 'Run mode',
45 'description': 'Directory in which the output file should be written'}
46
47 option['uncompressed'] = {'short': 'u', 'arg': False,
48 'default': None,
49 'group': 'Run mode',
50 'description': 'Write out uncompressed data (default without this option is compressed)'}
51
52 # stream tag options
53 option['stream-name'] = {'short': 's', 'arg': True,
54 'default': None,
55 'group': 'Stream Tag',
56 'description': 'Name(s) of stream(s) which should be written out, e.g. "stream1,stream2,stream3"'}
57
58 option['project-tag'] = {'short': 'p', 'arg': True,
59 'default': None,
60 'group': 'Stream Tag',
61 'description': 'Project tag which should be used for the output file'}
62
63 option['lumi-block'] = {'short': 'l', 'arg': True,
64 'default': -1,
65 'group': 'Stream Tag',
66 'description': 'Lumiblock number used for the output file. Use 0 if multiple LB in file.'}
67
68 # HLT result options
69 option['hlt-only'] = {'short': 'm', 'arg': True,
70 'default': None,
71 'group': 'HLT Result',
72 'description': 'Drop all detector data and write out only HLT data for the given module ID.' + \
73 ' Module ID <0 is a wildcard for all HLT module IDs.'}
74
75 parser = myopt.Parser(extra_args=True)
76 for (k,v) in option.items():
77 parser.add_option(k, v['short'], v['description'], v['arg'], v['default'],v['group'])
78
79 if len(sys.argv) == 1:
80 print (parser.usage('global "%s" options:' % sys.argv[0]))
81 sys.exit(1)
82
83 # process the global options
84 (kwargs, extra) = parser.parse(sys.argv[1:], prefix='global "%s" options:' % sys.argv[0])
85
86 # global defaults
87 logging.getLogger('').name = os.path.splitext(os.path.basename(sys.argv[0]))[0]
88 logging.getLogger('').setLevel(kwargs['verbosity'])
89
90 # input data stream
91 stream = eformat.istream(extra)
92 # input event counter
93 totalEvents_in = 0
94
95 # get metadata from inputfile
96 dr = eformat.EventStorage.pickDataReader(extra[0])
97
98 # interpret input file name
99 df = eformat.EventStorage.RawFileName(extra[0])
100
101 # extract some parameters from meta-data
102 projectTag = dr.projectTag()
103 lumiBlockNumber = dr.lumiblockNumber()
104 applicationName = 'athenaHLT'
105 streamType = 'unknown' # the real stream type will be extracted from the matching stream tag
106 if df.hasValidCore() :
107 productionStep = df.productionStep()
108 else:
109 productionStep = 'unknown'
110
111 # input parameters for building the output file name
112 runNumber = dr.runNumber()
113 outputDirectory = kwargs['output-dir']
114 streamName = kwargs['stream-name']
115 # check if multiple streams should be written to the same output file (used in debug recovery)
116 streamNames_out = streamName.split(',')
117 if len(streamNames_out) > 1:
118 streamName = 'accepted'
119
120 if kwargs['project-tag'] is not None:
121 projectTag = kwargs['project-tag']
122 if kwargs['lumi-block'] != -1:
123 lumiBlockNumber = kwargs['lumi-block'] # if output file can have multiple lumi blocks, use 0
124
125 if (lumiBlockNumber==0):
126 productionStep = 'merge'
127
128 # check the output directory if it exists
129 if (not os.path.exists(outputDirectory)) or (not os.path.isdir(outputDirectory)):
130 logging.fatal(' Output directory %s does not exist ' % outputDirectory)
131 sys.exit(1)
132
133 # output event counter
134 totalEvents_out = 0
135
136 # counter of skipped events
137 totalEvents_skipped = 0
138
139 # Loop over events
140 for e in stream:
141
142 if kwargs['max-events'] > 0 and totalEvents_in >= kwargs['max-events']:
143 logging.info(' Maximum number of events reached : %d', kwargs['max-events'])
144 break
145
146 totalEvents_in += 1
147
148 # select events
149 if kwargs['start-event'] > 0:
150 kwargs['start-event'] -= 1
151 totalEvents_skipped += 1
152 continue
153
154 # find StreamTags and see if there is a match
155 streamTags = e.stream_tag()
156 logging.debug(' === New Event nr = %s (Run,Global ID) = (%d,%d) === ', totalEvents_in,e.run_no(),e.global_id())
157 #count accepted streams for each event
158 streamAccepted = 0
159 for tag in streamTags:
160 if tag.name in streamNames_out:
161 #avoid duplication of events that have > 1 streamNames_out
162 if streamAccepted : continue
163 streamAccepted += 1
164
165 # the event should be written out
166 logging.debug(' Matching event found for stream tag = %s', tag)
167 logging.debug(' Stream Tag:Robs = %s', [hex(r) for r in tag.robs])
168 logging.debug(' Stream Tag:Dets = %s', [hex(d) for d in tag.dets])
169
170 # check the lumi block number from the event against the lumi block number defined for the file
171 # this check is only done if the lumi block number for the file is different from 0
172 if lumiBlockNumber > 0:
173 if e.lumi_block() != lumiBlockNumber:
174 logging.error(' Event (Run,Global ID) = (%d,%d) has a lumi block number %d,'
175 ' which is different from LB = %d for the output file. Event skipped.',
176 e.run_no(),e.global_id(),e.lumi_block(),lumiBlockNumber)
177 continue
178
179 # check that all events have the same run number as the output file indicates otherwise skip event
180 if e.run_no() != runNumber:
181 logging.error(' Event (Run,Global ID) = (%d,%d) has a run number,'
182 ' which is different from the run number = %d for the output file. Event skipped.',
183 e.run_no(),e.global_id(),runNumber)
184 continue
185
186 # set the overall tag type for the first match
187 if streamType != tag.type:
188 streamType = tag.type
189 logging.debug(' streamType set to = %s', streamType)
190 # create the RAW output file name
191 outRawFile = eformat.EventStorage.RawFileName(projectTag,
192 runNumber,
193 streamType,
194 streamName,
195 lumiBlockNumber,
196 applicationName,
197 productionStep)
198 logging.debug(' set output file name = %s', outRawFile.fileNameCore())
199
200 # Note: EventStorage and eformat compression enums have different values
201 compressionTypeES = CompressionType.NONE if kwargs['uncompressed'] else CompressionType.ZLIB
202 compressionType = eformat.helper.Compression.UNCOMPRESSED if kwargs['uncompressed'] \
203 else eformat.helper.Compression.ZLIB
204 compressionLevel = 0 if kwargs['uncompressed'] else 1
205
206 # create the output stream
207 ostream = eformat.ostream(directory=outputDirectory,
208 core_name=outRawFile.fileNameCore(),
209 run_number=dr.runNumber(),
210 trigger_type=dr.triggerType(),
211 detector_mask=dr.detectorMask(),
212 beam_type=dr.beamType(),
213 beam_energy=dr.beamEnergy(),
214 compression=compressionTypeES,
215 complevel=compressionLevel)
216
217 # decide what to write out
218 is_feb_tag = (len(tag.robs)==0 and len(tag.dets)==0)
219 if is_feb_tag and not kwargs['hlt-only']:
220 # write out the full event fragment
221 pbev = eformat.write.FullEventFragment(e)
222 logging.debug(' Write full event fragment ')
223 else:
224 # filter stream tag robs and dets for the hlt-only option
225 dets = []
226 robs = []
227 if kwargs['hlt-only']:
228 if int(kwargs['hlt-only']) < 0:
229 dets = [SubDetector.TDAQ_HLT] if SubDetector.TDAQ_HLT in tag.dets or is_feb_tag else []
230 robs = [robid for robid in tag.robs if SourceIdentifier(robid).subdetector_id()==SubDetector.TDAQ_HLT]
231 else:
232 requested_rob_id = int(SourceIdentifier(SubDetector.TDAQ_HLT, int(kwargs['hlt-only'])))
233 if SubDetector.TDAQ_HLT in tag.dets or requested_rob_id in tag.robs or is_feb_tag:
234 robs = [requested_rob_id]
235 else:
236 dets = list(tag.dets)
237 robs = list(tag.robs)
238
239 # select ROBs to write out
240 rob_output_list = []
241 logging.debug(' Write partial event fragment ')
242 for rob in e:
243 if rob.source_id().code() in robs:
244 rob_output_list.append(rob)
245 if rob.source_id().subdetector_id() in dets:
246 rob_output_list.append(rob)
247 # write out the partial event fragment
248 pbev = eformat.write.FullEventFragment()
249 pbev.copy_header(e)
250 for out_rob in rob_output_list:
251 pbev.append_unchecked(out_rob)
252
253 # put the event onto the output stream
254 pbev.compression_type(compressionType)
255 pbev.compression_level(compressionLevel)
256 ostream.write(pbev)
257 if (logging.getLogger('').getEffectiveLevel() > logging.DEBUG) and kwargs['progress-bar']:
258 sys.stdout.write('.')
259 sys.stdout.flush()
260
261 # increase output event counter
262 totalEvents_out += 1
263
264 # print final statistics
265 logging.info('Total number of events processed = %d ', totalEvents_in)
266 logging.info('Number of events skipped at the beginning = %d ', totalEvents_skipped)
267 logging.info('Number of events written to output file = %d ', totalEvents_out)
268 if totalEvents_out > 0:
269 logging.info('Output file = %s ', ostream.last_filename())
270 else:
271 logging.error('No events selected so no output file created')
272 sys.exit(1)
273
274 sys.exit(0)
275
276if __name__ == "__main__":
277 peb_writer()