ATLAS Offline Software
trigbs_extractStream.py
Go to the documentation of this file.
1 #!/usr/bin/env tdaq_python
2 
3 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
4 
5 # select events for a given stream name from an input file and write them in an outfile
6 # the output file obeys the conventions used by the SFO in P1
7 
8 import sys
9 import os
10 
11 def peb_writer():
12  """Runs the splitting routines"""
13 
14  import eformat, logging
15  import EventApps.myopt as myopt
16  from libpyevent_storage import CompressionType
17  from libpyeformat_helper import SourceIdentifier, SubDetector
18 
19  option = {}
20 
21  # run mode options
22  option['start-event'] = {'short': 'a', 'arg': True,
23  'default': 0,
24  'group': 'Run mode',
25  'description': 'Number of events which should be skipped from the begin'}
26 
27  option['max-events'] = {'short': 'n', 'arg': True,
28  'default': 0,
29  'group': 'Run mode',
30  'description': 'Maximum number of events in the output file. 0 means, all useful events from the input.'}
31 
32  option['verbosity'] = {'short': 'v', 'arg': True,
33  'default': logging.INFO,
34  'group': 'Run mode',
35  'description': 'Log verbosity'}
36 
37  option['progress-bar'] = {'short': 'P', 'arg': False,
38  'default': None,
39  'group': 'Run mode',
40  'description': 'Show progress bar when running interactively'}
41 
42  option['output-dir'] = {'short': 'd', 'arg': True,
43  'default': '.',
44  'group': 'Run mode',
45  'description': 'Directory in which the output file should be written'}
46 
47  option['uncompressed'] = {'short': 'u', 'arg': False,
48  'default': None,
49  'group': 'Run mode',
50  'description': 'Write out uncompressed data (default without this option is compressed)'}
51 
52  # stream tag options
53  option['stream-name'] = {'short': 's', 'arg': True,
54  'default': None,
55  'group': 'Stream Tag',
56  'description': 'Name(s) of stream(s) which should be written out, e.g. "stream1,stream2,stream3"'}
57 
58  option['project-tag'] = {'short': 'p', 'arg': True,
59  'default': None,
60  'group': 'Stream Tag',
61  'description': 'Project tag which should be used for the output file'}
62 
63  option['lumi-block'] = {'short': 'l', 'arg': True,
64  'default': -1,
65  'group': 'Stream Tag',
66  'description': 'Lumiblock number used for the output file. Use 0 if multiple LB in file.'}
67 
68  # HLT result options
69  option['hlt-only'] = {'short': 'm', 'arg': True,
70  'default': None,
71  'group': 'HLT Result',
72  'description': 'Drop all detector data and write out only HLT data for the given module ID.' + \
73  ' Module ID <0 is a wildcard for all HLT module IDs.'}
74 
75  parser = myopt.Parser(extra_args=True)
76  for (k,v) in option.items():
77  parser.add_option(k, v['short'], v['description'], v['arg'], v['default'],v['group'])
78 
79  if len(sys.argv) == 1:
80  print (parser.usage('global "%s" options:' % sys.argv[0]))
81  sys.exit(1)
82 
83  # process the global options
84  (kwargs, extra) = parser.parse(sys.argv[1:], prefix='global "%s" options:' % sys.argv[0])
85 
86  # global defaults
87  logging.getLogger('').name = os.path.splitext(os.path.basename(sys.argv[0]))[0]
88  logging.getLogger('').setLevel(kwargs['verbosity'])
89 
90  # input data stream
91  stream = eformat.istream(extra)
92  # input event counter
93  totalEvents_in = 0
94 
95  # get metadata from inputfile
96  dr = eformat.EventStorage.pickDataReader(extra[0])
97 
98  # interpret input file name
99  df = eformat.EventStorage.RawFileName(extra[0])
100 
101  # extract some parameters from meta-data
102  projectTag = dr.projectTag()
103  lumiBlockNumber = dr.lumiblockNumber()
104  applicationName = 'athenaHLT'
105  streamType = 'unknown' # the real stream type will be extracted from the matching stream tag
106  if df.hasValidCore() :
107  productionStep = df.productionStep()
108  else:
109  productionStep = 'unknown'
110 
111  # input parameters for building the output file name
112  runNumber = dr.runNumber()
113  outputDirectory = kwargs['output-dir']
114  streamName = kwargs['stream-name']
115  # check if multiple streams should be written to the same output file (used in debug recovery)
116  streamNames_out = streamName.split(',')
117  if len(streamNames_out) > 1:
118  streamName = 'accepted'
119 
120  if kwargs['project-tag'] is not None:
121  projectTag = kwargs['project-tag']
122  if kwargs['lumi-block'] != -1:
123  lumiBlockNumber = kwargs['lumi-block'] # if output file can have multiple lumi blocks, use 0
124 
125  if (lumiBlockNumber==0):
126  productionStep = 'merge'
127 
128  # check the output directory if it exists
129  if (not os.path.exists(outputDirectory)) or (not os.path.isdir(outputDirectory)):
130  logging.fatal(' Output directory %s does not exist ' % outputDirectory)
131  sys.exit(1)
132 
133  # output event counter
134  totalEvents_out = 0
135 
136  # counter of skipped events
137  totalEvents_skipped = 0
138 
139  # Loop over events
140  for e in stream:
141  totalEvents_in += 1
142 
143  # select events
144  if kwargs['start-event'] > 0:
145  kwargs['start-event'] -= 1
146  totalEvents_skipped += 1
147  continue
148 
149  if kwargs['max-events'] > 0 and totalEvents_in >= kwargs['max-events']:
150  logging.info(' Maximum number of events reached : %d', kwargs['max-events'])
151  break
152 
153  # find StreamTags and see if there is a match
154  streamTags = e.stream_tag()
155  logging.debug(' === New Event nr = %s (Run,Global ID) = (%d,%d) === ', totalEvents_in,e.run_no(),e.global_id())
156  #count accepted streams for each event
157  streamAccepted = 0
158  for tag in streamTags:
159  if tag.name in streamNames_out:
160  #avoid duplication of events that have > 1 streamNames_out
161  if streamAccepted : continue
162  streamAccepted += 1
163 
164  # the event should be written out
165  logging.debug(' Matching event found for stream tag = %s', tag)
166  logging.debug(' Stream Tag:Robs = %s', [hex(r) for r in tag.robs])
167  logging.debug(' Stream Tag:Dets = %s', [hex(d) for d in tag.dets])
168 
169  # check the lumi block number from the event against the lumi block number defined for the file
170  # this check is only done if the lumi block number for the file is different from 0
171  if lumiBlockNumber > 0:
172  if e.lumi_block() != lumiBlockNumber:
173  logging.error(' Event (Run,Global ID) = (%d,%d) has a lumi block number %d,'
174  ' which is different from LB = %d for the output file. Event skipped.',
175  e.run_no(),e.global_id(),e.lumi_block(),lumiBlockNumber)
176  continue
177 
178  # check that all events have the same run number as the output file indicates otherwise skip event
179  if e.run_no() != runNumber:
180  logging.error(' Event (Run,Global ID) = (%d,%d) has a run number,'
181  ' which is different from the run number = %d for the output file. Event skipped.',
182  e.run_no(),e.global_id(),runNumber)
183  continue
184 
185  # set the overall tag type for the first match
186  if streamType != tag.type:
187  streamType = tag.type
188  logging.debug(' streamType set to = %s', streamType)
189  # create the RAW output file name
190  outRawFile = eformat.EventStorage.RawFileName(projectTag,
191  runNumber,
192  streamType,
193  streamName,
194  lumiBlockNumber,
195  applicationName,
196  productionStep)
197  logging.debug(' set output file name = %s', outRawFile.fileNameCore())
198 
199  # Note: EventStorage and eformat compression enums have different values
200  compressionTypeES = CompressionType.NONE if kwargs['uncompressed'] else CompressionType.ZLIB
201  compressionType = eformat.helper.Compression.UNCOMPRESSED if kwargs['uncompressed'] \
202  else eformat.helper.Compression.ZLIB
203  compressionLevel = 0 if kwargs['uncompressed'] else 1
204 
205  # create the output stream
206  ostream = eformat.ostream(directory=outputDirectory,
207  core_name=outRawFile.fileNameCore(),
208  run_number=dr.runNumber(),
209  trigger_type=dr.triggerType(),
210  detector_mask=dr.detectorMask(),
211  beam_type=dr.beamType(),
212  beam_energy=dr.beamEnergy(),
213  compression=compressionTypeES,
214  complevel=compressionLevel)
215 
216  # decide what to write out
217  is_feb_tag = (len(tag.robs)==0 and len(tag.dets)==0)
218  if is_feb_tag and not kwargs['hlt-only']:
219  # write out the full event fragment
220  pbev = eformat.write.FullEventFragment(e)
221  logging.debug(' Write full event fragment ')
222  else:
223  # filter stream tag robs and dets for the hlt-only option
224  dets = []
225  robs = []
226  if kwargs['hlt-only']:
227  if int(kwargs['hlt-only']) < 0:
228  dets = [SubDetector.TDAQ_HLT] if SubDetector.TDAQ_HLT in tag.dets or is_feb_tag else []
229  robs = [robid for robid in tag.robs if SourceIdentifier(robid).subdetector_id()==SubDetector.TDAQ_HLT]
230  else:
231  requested_rob_id = int(SourceIdentifier(SubDetector.TDAQ_HLT, int(kwargs['hlt-only'])))
232  if SubDetector.TDAQ_HLT in tag.dets or requested_rob_id in tag.robs or is_feb_tag:
233  robs = [requested_rob_id]
234  else:
235  dets = list(tag.dets)
236  robs = list(tag.robs)
237 
238  # select ROBs to write out
239  rob_output_list = []
240  logging.debug(' Write partial event fragment ')
241  for rob in e:
242  if rob.source_id().code() in robs:
243  rob_output_list.append(rob)
244  if rob.source_id().subdetector_id() in dets:
245  rob_output_list.append(rob)
246  # write out the partial event fragment
247  pbev = eformat.write.FullEventFragment()
248  pbev.copy_header(e)
249  for out_rob in rob_output_list:
250  pbev.append_unchecked(out_rob)
251 
252  # put the event onto the output stream
253  pbev.compression_type(compressionType)
254  pbev.compression_level(compressionLevel)
255  ostream.write(pbev)
256  if (logging.getLogger('').getEffectiveLevel() > logging.DEBUG) and kwargs['progress-bar']:
257  sys.stdout.write('.')
258  sys.stdout.flush()
259 
260  # increase output event counter
261  totalEvents_out += 1
262 
263  # print final statistics
264  logging.info('Total number of events processed = %d ', totalEvents_in)
265  logging.info('Number of events skipped at the beginning = %d ', totalEvents_skipped)
266  logging.info('Number of events written to output file = %d ', totalEvents_out)
267  if totalEvents_out > 0:
268  logging.info('Output file = %s ', ostream.last_filename())
269  else:
270  logging.error('No events selected so no output file created')
271  sys.exit(1)
272 
273  sys.exit(0)
274 
275 if __name__ == "__main__":
276  peb_writer()
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
pool::DbPrintLvl::setLevel
void setLevel(MsgLevel l)
Definition: DbPrint.h:32
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
pmontree.code
code
Definition: pmontree.py:443
trigbs_extractStream.peb_writer
def peb_writer()
Definition: trigbs_extractStream.py:11