ATLAS Offline Software
POOL2EI_Lib.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2 
3 # @file POOL2EI_Lib.py
4 # @purpose provide components to get EventIndex data from pool files
5 # @author Javier Sanchez
6 # @date February 2014
7 # @update January 2022
8 #
9 # Some code borrowed from PyAthena.FilePeekerLib
10 # credits to Sebastien Binet
11 
12 
13 __version__ = "$Revision: 3.0 $"
14 __author__ = "Javier Sanchez"
15 __doc__ = "provide components to POOL2EI"
16 
17 
18 # imports -----------------------------------------------------------------
19 import os
20 import AthenaPython.PyAthena as PyAthena
21 from .compressB64 import compressB64
22 from .EI_Lib import IOV
23 import EventIndexProducer.EIconstants as eic
24 
25 import struct
26 from EventIndexProducer.EIPBof import EIPBof
27 import gzip
28 
29 import time
30 StatusCode = PyAthena.StatusCode
31 
32 
34  import ROOT
35  ROOT.gROOT.SetBatch(True)
36  return ROOT
37 
38 
39 def toiter(beg, end):
40  while beg != end:
41  yield beg.__deref__()
42  beg.__preinc__()
43  return
44 
45 
47  """Algorithm
48  """
49  _iov = IOV()
50  _eif = None
51  _eif_entries = 0
52  _eif_totentries = 0
53  _eif_nfiles = 0
54  _eifname = None
55  _eifname_spb = None
56  _initialized = False
57 
58  _eiversion = 0x00000001
59  _eimagic = 0x6e56c8c7
60 
61  def __init__(self, name='POOL2EI', **kw):
62  # init base class
63  super(POOL2EI, self).__init__(name, **kw)
64 
65  _info = self.msg.info
66  _info("POOL2EI::__init__")
67 
68  # ----------------------------------------
69  # initialize
70  # ----------------------------------------
71 
72  def initialize(self):
73 
74  import AthenaPython.PyAthena as PyAthena
75  _info = self.msg.info
76  _info("POOL2EI::initialize")
77 
78  # Am I being called for every input file ?
79  if self._initialized:
80  return
81 
82  _info("## DoProvenanceRef: {}".format(self.DoProvenanceRef))
83  _info("## DoTriggerInfo: {}".format(self.DoTriggerInfo))
84  _info("## SendToBroker: {}".format(self.SendToBroker))
85 
86  self.eipbof = EIPBof(self._eiversion) # initialize factory
87 
88  self._dsname = "Unknown.Input.Dataset.Name" # default fake value
89  if self.EiDsName is not None:
90  _info("## EiDsName: {}".format(self.EiDsName))
91  self._dsname = self.EiDsName
92  else:
93  # try to get dataset name from pathena INDS environment variable
94  inds = os.getenv('INDS')
95  if inds is not None:
96  _info("## INDS: {}".format(inds))
97  self._dsname = inds
98  else:
99  # else, try to use job definition
100  try:
101  import newJobDef
102  processingType = newJobDef.job['processingType']
103  transformation = newJobDef.job['transformation']
104  dsSource = 'realDatasetsIn' # dataset name from input
105  if (processingType == 'merge' and
106  transformation != 'POOLtoEI_tf.py'):
107  dsSource = 'realDatasets' # dataset name from output
108  datasets = newJobDef.job[dsSource].split(',')
109  _info("## {}[0]: {}".format(dsSource, datasets[0]))
110  self._dsname = datasets[0]
111  except Exception:
112  _info('## Unable to get dataset name from realDatasetsIn '
113  'or realDatasets')
114 
115  # remove _tid and _sub parts from dsname
116  import re
117  self._dsname_orig = self._dsname
118  self._dsname = re.sub('_tid[0-9]{8}_[0-9]{2}', '', self._dsname)
119  self._dsname = re.sub('_sub[0-9]{10}', '', self._dsname)
120  self._dsname = re.sub('/$', '', self._dsname)
121 
122  # load our pythonizations:
123  for cls_name in ('EventStreamInfo', 'EventType', 'PyEventType'):
124  cls = getattr(PyAthena, cls_name) # noqa: F841
125 
126  _info("retrieving various stores...")
127  for store_name in ('evtStore', 'inputStore'):
128  _info("retrieving [{}]...".format(store_name))
129  o = getattr(self, store_name) # noqa: F841
130  _info("retrieving [{}]... [done]".format(store_name))
131  _info("retrieving various stores... [done]")
132 
133  # Build output file names
134  # self.EiFmt ignored
135  # keep self.EiFmt format flag for compatibility and future usage
136  if self.Out is not None:
137  oname_spb = self.Out
138  else:
139  oname_spb = "output.ei.spb"
140 
141  # open EI output file
142  oname_spb = os.path.expanduser(os.path.expandvars(oname_spb))
143  self._eifname_spb = oname_spb
144  _info('Opening EI SPB file [{}]...'.format(oname_spb))
145 
146  if os.path.exists(oname_spb):
147  os.remove(oname_spb)
148 
149  try:
150  self._eif_spb = gzip.open(oname_spb, 'wb')
151  except Exception:
152  self._eif_spb = None
153  self.msg.fatal("Unable to open EI SPB output file {} "
154  "exapnded as {}".format(self.Out, oname_spb))
155  raise RuntimeError("Unable to open EI SPB output file")
156 
157  self._eif_spb.write(struct.pack('<I', self._eimagic)) # EI SPB magic
158  self._eif_spb.write(struct.pack('<I', self._eiversion)) # EI SPB ver
159 
160  # get taskid and jobid
161  if (hasattr(self, 'TaskID') and hasattr(self, 'JobID') and
162  self.TaskID is not None and self.JobID is not None):
163  taskID = "{}.T".format(self.TaskID)
164  if (hasattr(self, 'AttemptNumber') and
165  self.AttemptNumber is not None):
166  jobID = "{}.{}".format(self.JobID, self.AttemptNumber)
167  else:
168  jobID = "{}.0".format(self.JobID)
169  else:
170  taskID = "{}.G".format(os.getenv('PanDA_TaskID', 0))
171  jobID = "{}.{}".format(os.getenv('PandaID', 0),
172  os.getenv('PanDA_AttemptNr', 0))
173 
174  # initial information SPB
175  # write Header
176  if self._eif_spb is not None:
177  header = self.eipbof.Header()
178  header.startProcTime = int(time.time() * 1000)
179  header.taskID = taskID
180  header.jobID = jobID
181  header.inputDsName = self._dsname_orig
182  header.provenanceRef = self.DoProvenanceRef
183  header.triggerInfo = self.DoTriggerInfo
184 
185  spb = header.SerializeToString()
186  self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_HEADER << 8 |
187  eic.EI_PROTO_MSGVER))
188  self._eif_spb.write(struct.pack('<I', len(spb)))
189  self._eif_spb.write(spb)
190 
191  self._initialized = True
192 
193  return StatusCode.Success
194 
195  @property
196  def evtStore(self):
197  import AthenaPython.PyAthena as PyAthena
198  return PyAthena.py_svc('StoreGateSvc/StoreGateSvc')
199 
200  @property
201  def inputStore(self):
202  import AthenaPython.PyAthena as PyAthena
203  return PyAthena.py_svc('StoreGateSvc/InputMetaDataStore')
204 
205  def process_metadata(self, store, metadata_name):
206  msg = self.msg
207  try:
208  obj = store[metadata_name]
209  except KeyError:
210  msg.warning('could not retrieve [{}]'.format(metadata_name))
211  return ([], [])
212  try:
213  msg.info('processing container [{}]'.format(obj.folderName()))
214  except Exception:
215  msg.info('container does not have folderName')
216  data = []
217  iovs = []
218  payloads = obj.payloadContainer()
219  payloads_sz = payloads.size()
220  if hasattr(payloads, 'at'):
221  # HACK for bug #77976
222  _tmp = payloads
223  payloads = []
224  for ii in range(payloads_sz):
225  payloads.append(_tmp.at(ii))
226  pass
227  for ii, payload in zip(range(payloads_sz), payloads):
228  if not payload:
229  msg.info("**error** null-pointer ?")
230  continue
231  # names
232  chan_names = []
233  sz = payload.name_size()
234  for idx in range(sz):
235  chan = payload.chanNum(idx)
236  chan_name = payload.chanName(chan)
237  chan_names.append(chan_name)
238 
239  # iovs
240  sz = payload.iov_size()
241  for idx in range(sz):
242  chan = payload.chanNum(idx)
243  iov_range = payload.iovRange(chan)
244  iov_start = iov_range.start()
245  iov_stop = iov_range.stop()
246  if (idx < 10):
247  msg.info('({}, {}) => ({}, {}) valid={} runEvt={}'.format(
248  iov_start.run(),
249  iov_start.event(),
250  iov_stop.run(),
251  iov_stop.event(),
252  iov_start.isValid(),
253  iov_start.isRunEvent()))
254  elif (idx == 10):
255  msg.info("... {} more".format(sz-9))
256  iovs.append((iov_start.run(), iov_start.event(),
257  iov_stop.run(), iov_stop.event(),
258  iov_start.isValid(), iov_start.isRunEvent(),
259  chan))
260 
261  # attrs
262  attrs = [] # can't use a dict as spec.name() isn't unique
263  sz = payload.size()
264  for idx in range(sz):
265  chan = payload.chanNum(idx)
266  attr_list = payload.attributeList(chan)
267  attr_data = []
268  for a in list(toiter(attr_list.begin(), attr_list.end())):
269  spec = a.specification()
270  a_type = spec.typeName()
271  if a_type.find('string') >= 0:
272  a_data = a.data['string']()
273  try:
274  a_data = eval(a_data, {}, {})
275  except Exception:
276  # swallow and keep as a string
277  pass
278  else:
279  a_data = a.data[a_type]()
280  attr_data.append((spec.name(), a_data))
281  attrs.append(dict(attr_data))
282  if len(attrs) == len(chan_names):
283  data.append(dict(zip(chan_names, attrs)))
284  else:
285  if len(attrs):
286  if len(attrs) == 1:
287  data.append(attrs[0])
288  else:
289  data.append(attrs)
290  else:
291  data.append(chan_names)
292  pass # loop over payloads...
293 
294  return (data, iovs)
295 
296  # ----------------------------------------
297  # execute at begin of file
298  # ----------------------------------------
299  def beginFile(self):
300 
301  _info = self.msg.info
302  _info("POOL2EI::beginFile")
303 
304  # entries for this input file
305  self._eif_entries = 0
306 
307  # define a new IOV storage
308  self._iov = IOV()
309 
310  # ======================================================
311  # retrieve the GUID
312  # ======================================================
313 
314  def _get_guid():
315  ROOT = _import_ROOT()
316  root_files = list(ROOT.gROOT.GetListOfFiles())
317  if len(root_files) == 0:
318  _info('could not find correct ROOT file')
319  return
320 
321  root_file = root_files[-1] # get the last file in the list
322  pool = root_file.Get("##Params")
323  import re
324  # Pool parameters are of the form:
325  # '[NAME=somevalue][VALUE=thevalue]'
326  pool_token = re.compile(r'\[NAME=(?P<name>.*?)\]'
327  r'\[VALUE=(?P<value>.*?)\]').match
328  params = []
329  guids = []
330  for i in range(pool.GetEntries()):
331  if pool.GetEntry(i) > 0:
332  pool_string = pool.db_string
333  # take string until \0 is found
334  n = pool_string.find('\0')
335  if n != -1:
336  pool_string = pool_string[:n]
337  match = pool_token(pool_string)
338  if not match:
339  continue
340  d = match.groupdict()
341  params.append((d['name'], d['value']))
342  if d['name'].lower() == 'fid':
343  guids.append(d['value'])
344  return guids
345 
346  self.guids = _get_guid()
347  if len(self.guids) > 0:
348  self.guid = self.guids.pop()
349  else:
350  self.guid = None
351 
352  # ======================================================
353  # trigger menu
354  # ======================================================
355  if self.DoTriggerInfo:
356  l1_menu = None
357  l2_menu = None
358  ef_menu = None
359  hlt_menu = None
360  if self.meta_triggermenu:
361  _info("Load trigger menu from TriggerMenu")
362  for e in self.inputStore['TriggerMenu']:
363  smk = e.smk()
364  l1psk = e.l1psk()
365  hltpsk = e.hltpsk()
366  chainIds = e.chainIds()
367  chainNames = e.chainNames()
368  itemCtpIds = e.itemCtpIds()
369  itemNames = e.itemNames()
370  l1_menu = {name: id for id, name in
371  zip(itemCtpIds, itemNames)}
372  hlt_menu = {name: id for id, name in
373  zip(chainIds, chainNames)}
374  break # first element is enough
375 
376  elif self.meta_triggermenujson_l1 or self.meta_triggermenujson_hlt:
377  _info("Load trigger menu from TriggerMenuJson_L1 and "
378  "TriggerMenuJson_HLT")
379  import json
380  for e in self.inputStore['TriggerMenuJson_L1']:
381  l1_menu_raw = json.loads(e.payload())
382  l1_items = l1_menu_raw['items']
383  l1_menu = {l1_key: l1_items[l1_key]['ctpid']
384  for l1_key in l1_items}
385  break # first element is enough
386  for e in self.inputStore['TriggerMenuJson_HLT']:
387  hlt_menu_raw = json.loads(e.payload())
388  hlt_chains = hlt_menu_raw['chains']
389  hlt_menu = {chain: hlt_chains[chain]['counter']
390  for chain in hlt_chains}
391  break # first element is enough
392  elif self.meta_hlt_menu or self.meta_lvl1_menu:
393  _info("Load trigger menu from /TRIGGER/LVL1/Menu and "
394  "/TRIGGER/HLT/Menu")
395  # /TRIGGER/LVL1/Menu
396  (l1menu_info, l1menu_iovs) = self.process_metadata(
397  self.inputStore, '/TRIGGER/LVL1/Menu')
398  # bit mask number is in channel IOV
399  channels = [iov[6] for iov in l1menu_iovs]
400  names = [e['ItemName'] for e in l1menu_info[0]]
401  l1_menu = {name: id for name, id in zip(names, channels)}
402 
403  # /TRIGGER/HLT/Menu
404  (hltmenu_info, hltmenu_iovs) = self.process_metadata(
405  self.inputStore, '/TRIGGER/HLT/Menu')
406  l2_menu = {entry['ChainName']: entry['ChainCounter']
407  for entry in hltmenu_info[0]
408  if entry['ChainName'].startswith("L2_")}
409  ef_menu = {entry['ChainName']: entry['ChainCounter']
410  for entry in hltmenu_info[0]
411  if entry['ChainName'].startswith("EF_")}
412  hlt_menu = {entry['ChainName']: entry['ChainCounter']
413  for entry in hltmenu_info[0]
414  if entry['ChainName'].startswith("HLT_")}
415  else:
416  _info("Trigger Menu not found")
417 
418  # ======================================================
419  # trigger config keys
420  # ======================================================
421  smk = 0
422  l1psk = 0
423  hltpsk = 0
424  if self.meta_triggermenu:
425  _info("Load trigger config keys from TriggerMenu")
426  for e in self.inputStore['TriggerMenu']:
427  smk = e.smk()
428  l1psk = e.l1psk()
429  hltpsk = e.hltpsk()
430  break # first element is enough
431  if self.meta_hlt_hltconfigkeys:
432  _info("Load trigger config keys from /TRIGGER/HLT/HltConfigKeys")
433  # /TRIGGER/HLT/HltConfigKeys
434  (hltck_info, hltck_iovs) = self.process_metadata(
435  self.inputStore, '/TRIGGER/HLT/HltConfigKeys')
436  smk_l = [x['MasterConfigurationKey'] for x in hltck_info]
437  for val, iov in zip(smk_l, hltck_iovs):
438  self._iov.add('SMK', val, iov[:4])
439  smk = smk_l[0]
440  if self.meta_hlt_prescalekey:
441  _info("Load trigger config keys from /TRIGGER/HLT/PrescaleKey")
442  # /TRIGGER/HLT/PrescaleKey
443  (hltpk_info, hltpk_iovs) = self.process_metadata(
444  self.inputStore, '/TRIGGER/HLT/PrescaleKey')
445  hltpk_l = [x['HltPrescaleKey'] for x in hltpk_info]
446  for val, iov in zip(hltpk_l, hltpk_iovs):
447  self._iov.add('HLTPSK', val, iov[:4])
448  hltpsk = hltpk_l[0]
449  if self.meta_lvl1_lvl1configkey:
450  _info("Load trigger config keys from /TRIGGER/LVL1/Lvl1ConfigKey")
451  # /TRIGGER/LVL1/Lvl1ConfigKey
452  (l1pk_info, l1pk_iovs) = self.process_metadata(
453  self.inputStore, '/TRIGGER/LVL1/Lvl1ConfigKey')
454  l1pk_l = [x['Lvl1PrescaleConfigurationKey'] for x in l1pk_info]
455  for val, iov in zip(l1pk_l, l1pk_iovs):
456  self._iov.add('L1PSK', val, iov[:4])
457  l1psk = l1pk_l[0]
458 
459  # ======================================================
460  # AMITag, triggerStreamOfFile and project_name
461  # ======================================================
462  _info("Load AMITag, triggerStreamOfFile and project_name "
463  "from /TagInfo")
464 
465  (tginfo, tgiovs) = self.process_metadata(self.inputStore, '/TagInfo')
466  amitag = "Unknown"
467  trigStream = "Unknown"
468  projName = "Unknown"
469  if len(tginfo) > 0:
470  for tgi in tginfo:
471  if 'AMITag' in tgi:
472  amitag = tgi['AMITag']
473  _info("## AMITag: {}".format(amitag))
474  if 'triggerStreamOfFile' in tgi:
475  trigStream = tgi['triggerStreamOfFile']
476  _info("## triggerStreamOfFile: {}".format(trigStream))
477  if 'project_name' in tgi:
478  projName = tgi['project_name']
479  _info("## project_name: {}".format(projName))
480 
481  # ======================================================
482  # write BeginGUID
483  # ======================================================
484 
485  if self._eif_spb is not None:
486  beginGUID = self.eipbof.BeginGUID()
487  beginGUID.startProcTime = int(time.time() * 1000)
488  beginGUID.AMITag = str(amitag)
489  beginGUID.trigStream = str(trigStream)
490  beginGUID.projName = str(projName)
491  beginGUID.guid = self.guid
492 
493  spb = beginGUID.SerializeToString()
494  self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_BEGINGUID << 8 |
495  eic.EI_PROTO_MSGVER))
496  self._eif_spb.write(struct.pack('<I', len(spb)))
497  self._eif_spb.write(spb)
498 
499  self._eif_nfiles += 1
500 
501  # ======================================================
502  # write TriggerMenu
503  # ======================================================
504 
505  if (self.DoTriggerInfo and self._eif_spb is not None
506  and l1_menu is not None):
507  _info("Write trigger menu to output SPB")
508  tMenu = self.eipbof.TriggerMenu()
509  tMenu.SMK = smk
510  tMenu.L1PSK = l1psk
511  tMenu.HLTPSK = hltpsk
512 
513  def d2l(d):
514  menu = ["{}:{}".format(d[k], k) for k in d]
515  return ";".join(menu)
516 
517  if l1_menu is not None and len(l1_menu) > 0:
518  tMenu.L1Menu = d2l(l1_menu)
519  if l2_menu is not None and len(l2_menu) > 0:
520  tMenu.L2Menu = d2l(l2_menu)
521  if ef_menu is not None and len(ef_menu) > 0:
522  tMenu.EFMenu = d2l(ef_menu)
523  if hlt_menu is not None and len(hlt_menu) > 0:
524  tMenu.HLTMenu = d2l(hlt_menu)
525 
526  # tMenu.L1Menu is required
527  if len(tMenu.L1Menu) > 0:
528  spb = tMenu.SerializeToString()
529  self._eif_spb.write(struct.pack('<I',
530  eic.EI_PROTO_TRIGGERMENU << 8 |
531  eic.EI_PROTO_MSGVER))
532  self._eif_spb.write(struct.pack('<I', len(spb)))
533  self._eif_spb.write(spb)
534  else:
535  _info("Unable to write trigger menu to output SPB. "
536  "tMenu.L1Menu is empty")
537 
538  self.inputStore.clearStore()
539 
540  return
541 
542  # ----------------------------------------
543  # execute at end of file
544  # ----------------------------------------
545  def endFile(self):
546 
547  _info = self.msg.info
548  _info("POOL2EI::endFile")
549 
550  # write EndGUID
551  if self._eif_spb is not None:
552  endGUID = self.eipbof.EndGUID()
553  endGUID.nentries = self._eif_entries
554  endGUID.endProcTime = int(time.time() * 1000)
555 
556  spb = endGUID.SerializeToString()
557  self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_ENDGUID << 8 |
558  eic.EI_PROTO_MSGVER))
559  self._eif_spb.write(struct.pack('<I', len(spb)))
560  self._eif_spb.write(spb)
561 
562  self.inputStore.clearStore()
563 
564  return
565 
566  # ----------------------------------------
567  # execute at start of run
568  # ----------------------------------------
569  def start(self):
570 
571  _info = self.msg.info
572  _info("POOL2EI::start")
573 
574  return StatusCode.Success
575 
576  # ----------------------------------------
577  # execute event by event
578  # ----------------------------------------
579  def execute(self):
580 
581  if self._eif_totentries < 100:
582  _info = self.msg.info
583  else:
584  _info = lambda *x: None # noqa: E731
585  _warning = self.msg.warning
586 
587  _info("POOL2EI::execute")
588 
589  if self._eif_spb is not None:
590  eventPB = self.eipbof.EIEvent()
591 
592  # ======================================================
593  # Get EventInfo data
594  # ======================================================
595 
596  store = self.evtStore
597 
598  if self.item_xaod_eventinfo:
599 
600  _info('=== [xAOD::EventInfo] ===')
601  xei = store.retrieve('xAOD::EventInfo', 'EventInfo')
602  run_number = xei.runNumber()
603  event_number = xei.eventNumber()
604  lumi_block = xei.lumiBlock()
605  bunch_crossing_id = xei.bcid()
606  time_stamp = xei.timeStamp()
607  try:
608  time_stamp_ns = xei.timeStampNSOffset()
609  except Exception:
610  _info('## Event does not have xAOD::EventInfo::'
611  'timeStampNSOffset()')
612  time_stamp_ns = 0
613  evt_type_bit_mask = xei.eventTypeBitmask()
614  isSimulation = (evt_type_bit_mask & xei.IS_SIMULATION != 0)
615  isTestBeam = (evt_type_bit_mask & xei.IS_TESTBEAM != 0)
616  isCalibration = (evt_type_bit_mask & xei.IS_CALIBRATION != 0)
617  mc_channel_number = 0
618  mc_event_weight = 0.
619  if isSimulation:
620  try:
621  mc_channel_number = xei.mcChannelNumber()
622  mc_event_weight = xei.mcEventWeight()
623  except Exception:
624  pass
625  try:
626  extendedLevel1ID = xei.extendedLevel1ID()
627  except Exception:
628  _info('## Event does not have xAOD::EventInfo::'
629  'extendedLevel1ID()')
630  extendedLevel1ID = 0
631  # there isn't eInfoTrigger in xAOD::EventInfo
632  eInfoTrigger = None
633 
634  del xei
635 
636  elif self.item_eventinfo:
637 
638  # evt_info_keys = store.keys('EventInfo') # fails. bug ?
639  evt_info_keys = [
640  x for x in store.keys() if x.endswith("EventInfo")]
641  if len(evt_info_keys) != 1:
642  _info('more than one EventInfo: {}'.format(evt_info_keys))
643  _info(' ==> we\'ll use [{}]'.format(evt_info_keys[0]))
644  sg_key = evt_info_keys[0]
645  ei = store.retrieve('EventInfo', sg_key)
646  _info('=== [EventInfo#{}] ==='.format(sg_key))
647  eid = ei.event_ID()
648  run_number = eid.run_number()
649  event_number = eid.event_number()
650  lumi_block = eid.lumi_block()
651  bunch_crossing_id = eid.bunch_crossing_id()
652  time_stamp = eid.time_stamp()
653  time_stamp_ns = eid.time_stamp_ns_offset()
654  eitype = ei.event_type()
655  mc_channel_number = eitype.mc_channel_number()
656  mc_event_weight = eitype.mc_event_weight()
657  bm = list(eitype.bit_mask)
658  # IS_SIMULATION / IS_DATA
659  isSimulation = True if 'IS_SIMULATION' in bm else False
660  # IS_TESTBEAM / IS_FROM_ATLAS_DET
661  isTestBeam = True if 'IS_TESTBEAM' in bm else False
662  # IS_CALIBRATION / IS_PHYSICS
663  isCalibration = True if 'IS_CALIBRATION' in bm else False
664  extendedLevel1ID = 0
665  eInfoTrigger = ei.trigger_info()
666  extendedLevel1ID = eInfoTrigger.extendedLevel1ID()
667 
668  del ei
669 
670  else:
671  # FAIL
672  raise RuntimeError('Unable to find neither '
673  'xAOD::EventInfo nor EventInfo')
674 
675  _info('## run_number: {:d}'.format(run_number))
676  _info('## event_number: {:d}'.format(event_number))
677  _info('## bunch_crossing_id: {:d}'.format(bunch_crossing_id))
678  _info('## extendedLevel1ID: {:d}'.format(extendedLevel1ID))
679  _info('## lumi_block: {:d}'.format(lumi_block))
680  _info('## time_stamp: {:d}'.format(time_stamp))
681  _info('## time_stamp_ns_offset: {:d}'.format(time_stamp_ns))
682  _info('## EventWeight: {:f}'.format(mc_event_weight))
683  _info('## McChannelNumber: {:d}'.format(mc_channel_number))
684  _info('## isSimulation: {}'.format(isSimulation))
685  _info('## isTestBeam: {}'.format(isTestBeam))
686  _info('## isCalibration: {}'.format(isCalibration))
687 
688  if self._eif_spb is not None:
689  eventPB.runNumber = run_number
690  eventPB.eventNumber = event_number
691  eventPB.lumiBlock = lumi_block
692  eventPB.bcid = bunch_crossing_id
693  eventPB.timeStamp = time_stamp
694  eventPB.timeStampNSOffset = time_stamp_ns
695  eventPB.mcEventWeight = mc_event_weight
696  eventPB.mcChannelNumber = mc_channel_number
697  eventPB.isSimulation = isSimulation
698  eventPB.isCalibration = isCalibration
699  eventPB.isTestBeam = isTestBeam
700  eventPB.extendedLevel1ID = extendedLevel1ID
701 
702  # ======================================================
703  # Trigger conf keys
704  # ======================================================
705 
706  if self.item_xaod_TrigConfKeys:
707  _info("Retrieve TrigConfKeys from xAOD::TrigConfKeys")
708  tck = store.retrieve('xAOD::TrigConfKeys', 'TrigConfKeys')
709  evt_smk = tck.smk()
710  evt_l1psk = tck.l1psk()
711  evt_hltpsk = tck.hltpsk()
712  del tck
713  elif (self.meta_hlt_hltconfigkeys or self.meta_hlt_prescalekey
714  or self.meta_lvl1_lvl1configkey):
715  _info("Retrieve TrigConfKeys from /TRIGGER/**")
716  evt_smk = self._iov.get('SMK', (run_number, event_number))
717  evt_l1psk = self._iov.get('L1PSK', (run_number, lumi_block))
718  evt_hltpsk = self._iov.get('HLTPSK', (run_number, lumi_block))
719  else:
720  _info("Unable to retrieve TrigConfKeys")
721  evt_smk = 0
722  evt_l1psk = 0
723  evt_hltpsk = 0
724 
725  _info('## smk: {}'.format(evt_smk))
726  _info('## hltpsk: {}'.format(evt_hltpsk))
727  _info('## l1psk: {}'.format(evt_l1psk))
728 
729  if self._eif_spb is not None:
730  eventPB.SMK = evt_smk
731  eventPB.HLTPSK = evt_hltpsk
732  eventPB.L1PSK = evt_l1psk
733 
734  # ======================================================
735  # Trigger bit masks
736  # ======================================================
737 
738  if self.DoTriggerInfo:
739 
740  def v2b(v): # vector elements to binary string
741  res = ""
742  for e in v:
743  res += "{0:032b}".format(e)[::-1]
744  return res
745 
746  trigL1 = ""
747  trigL2 = ""
748  trigEF = ""
749 
750  if self.item_xaod_TrigDecision:
751 
752  _info("Get trigger bit masks form xAOD::TrigDecision")
753 
754  # retrieve xTrigDecision
755  xtd = store.retrieve('xAOD::TrigDecision', 'xTrigDecision')
756 
757  # L1
758  tbp = xtd.tbp() # vector: 16 elements of 32 bits = 512 bits
759  tap = xtd.tap()
760  tav = xtd.tav()
761  trigL1 = compressB64(v2b(tbp) + v2b(tap) + v2b(tav))
762  del tbp
763  del tap
764  del tav
765 
766  # L2
767  trigL2_PH = xtd.lvl2PassedPhysics() # 256*32 bits = 8192 bits
768  trigL2_PT = xtd.lvl2PassedThrough()
769  trigL2_RS = xtd.lvl2Resurrected()
770  trigL2 = "{};{};{}".format(
771  compressB64(v2b(trigL2_PH)),
772  compressB64(v2b(trigL2_PT)),
773  compressB64(v2b(trigL2_RS)))
774  del trigL2_PH
775  del trigL2_PT
776  del trigL2_RS
777 
778  # EF
779  trigEF_PH = xtd.efPassedPhysics() # 256*32 bits = 8192 bits
780  trigEF_PT = xtd.efPassedThrough()
781  trigEF_RS = xtd.efResurrected()
782  trigEF = "{};{};{}".format(
783  compressB64(v2b(trigEF_PH)),
784  compressB64(v2b(trigEF_PT)),
785  compressB64(v2b(trigEF_RS)))
786  del trigEF_PH
787  del trigEF_PT
788  del trigEF_RS
789 
790  del xtd
791 
792  else:
793 
794  _info("Get trigger bit masks form eventInfo.trigger_info()")
795 
796  # get trigger from eventInfo.trigger_info()
797  if eInfoTrigger is not None:
798  trigL1 = compressB64(v2b(eInfoTrigger.level1TriggerInfo()))
799  trigL2 = compressB64(v2b(eInfoTrigger.level2TriggerInfo()))
800  trigEF = compressB64(v2b(eInfoTrigger.eventFilterInfo()))
801 
802  _info("## trigL1: {}".format(trigL1))
803  _info("## trigL2: {}".format(trigL2))
804  _info("## trigEF: {}".format(trigEF))
805 
806  if self._eif_spb is not None:
807 
808  eventPB.L1PassedTrigMask = trigL1
809  eventPB.L2PassedTrigMask = trigL2
810  eventPB.EFPassedTrigMask = trigEF
811 
812  if eInfoTrigger is not None:
813  del eInfoTrigger
814 
815  # ======================================================
816  # Self reference and Provenance
817  # ======================================================
818 
819  def guid2string(guid):
820  # get guid as string
821  # native method toString() seems to produce a memory leak
822  # use getters for class Guid
823  # data1(), data2(), data3() are translated to integers in python
824  # data4(i) is translated from char in C++ to string in python
825  s="{:08X}-{:04X}-{:04X}-{:02X}{:02X}-{:02X}{:02X}{:02X}{:02X}{:02X}{:02X}".format(
826  guid.data1(), guid.data2(), guid.data3(),
827  ord(guid.data4(0)[0]), ord(guid.data4(1)[0]),
828  ord(guid.data4(2)[0]), ord(guid.data4(3)[0]),
829  ord(guid.data4(4)[0]), ord(guid.data4(5)[0]),
830  ord(guid.data4(6)[0]), ord(guid.data4(7)[0]))
831  return s
832 
833  def token2string (tk, replace_empty_cntID=False):
834  # get token as string
835  # native method toString() seems to produce a memory leak
836  # use getter for class Token
837  cntID = tk.contID()
838  if replace_empty_cntID and cntID == "":
839  cntID = "POOLContainer(DataHeader)"
840  stk = "[DB={}][CNT={}][CLID={}][TECH={:08X}][OID={:016X}-{:016X}]".format(
841  guid2string(tk.dbID()), cntID, guid2string(tk.classID()),
842  tk.technology(), tk.oid().first, tk.oid().second)
843  return stk
844 
845  Pstream_refs = {} # provenance references
846  procTag = None
847 
848  # -- Stream references
849  dh = store.retrieve('DataHeader', 'EventSelector')
850  procTag = dh.getProcessTag()
851 
852  if procTag == "":
853  # pacth procTag ATEAM-782
854  for el in dh.elements():
855  if el.getPrimaryClassID() == 222376821:
856  procTag = el.getKey()
857  break
858  _info("## ProcessTag: " + procTag)
859 
860  if self.DoProvenanceRef:
861 
862  # get provenance references
863  if dh.sizeProvenance() > 0:
864  prv = dh.beginProvenance()
865  for i in range(dh.sizeProvenance()):
866  key = prv.getKey()
867  tk = prv.getToken()
868  if key.startswith("Output"):
869  _warning('Provenance token starts with Output: {}'
870  .format(key))
871  key = key[6:]
872  if key.startswith("Input"):
873  _warning('Provenance token starts with Input: {}'
874  .format(key))
875  key = key[5:]
876  # CNT might be empty. Complete information
877  if key == "StreamRAW":
878  stk = token2string(tk, replace_empty_cntID=False)
879  elif key in ("StreamAOD", "StreamESD", "StreamRDO",
880  "StreamHITS", "StreamEVGEN",
881  "EmbeddingStream"):
882  stk = token2string(tk, replace_empty_cntID=True)
883  else:
884  stk = token2string(tk, replace_empty_cntID=False)
885  _info("provenance {}={}".format(key, stk))
886  _info('Unknown provenance stream: {}'.format(key))
887  # do not raise error, just continue. mar-2024
888  # _error('Unknown provenance stream: {}'.format(key))
889  # raise RuntimeError('Unknown provenance stream')
890  del tk
891  del key
892  prv += 1
893  continue
894  _info("## P" + key + "_ref: " + stk)
895  if key not in Pstream_refs:
896  # keep only the first provenance found for each straam
897  Pstream_refs[key] = stk
898  del tk
899  del key
900  prv += 1
901  del prv
902 
903 
904  # get self reference.
905  # look for the processing tag key in the Data Object vector
906  if self._eif_spb is not None:
907  tokenPB0 = eventPB.eitoken.add()
908  if dh.size() > 0:
909  dhe = dh.begin()
910  for i in range(dh.size()):
911  key = dhe.getKey()
912  if key.startswith('Stream'):
913  _info("## Stream: " + key)
914  if key in [procTag, 'StreamAOD']:
915  tk = dhe.getToken()
916  stk = token2string(tk, replace_empty_cntID=True)
917  _info("## " + key + "_ref: " + stk)
918  if self._eif_spb is not None:
919  if key == tokenPB0.name:
920  _info("Already inserted key {0} in tokenPB0 "
921  "with value {1}".format(key, stk))
922  tokenPB0.name = key
923  tokenPB0.token = stk
924  del tk
925  del key
926  dhe += 1
927  dh.end()
928  del dhe
929 
930  # Update self reference token to handle fast merged files.
931  try:
932  stk = store.proxy(dh).address().par().c_str()
933  if self._eif_spb is not None:
934  tokenPB0.token = stk
935  _info("Updated ref token " + stk)
936  del stk
937  except Exception:
938  pass
939 
940  # write provenance to protbuf message
941  if self._eif_spb is not None:
942  for sr in Pstream_refs:
943  try:
944  tokenPB = eventPB.eitoken.add()
945  tokenPB.name = sr
946  tokenPB.token = Pstream_refs[sr]
947  except Exception:
948  _info("Unable to insert {} in provenance stream "
949  "references with value {}".format(
950  sr, Pstream_refs[sr]))
951  pass
952 
953  del dh
954 
955  # ======================================================
956  # Write event EI
957  # ======================================================
958 
959  if self._eif_spb is not None:
960  spb = eventPB.SerializeToString()
961  self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_EIEVENT << 8 |
962  eic.EI_PROTO_MSGVER))
963  self._eif_spb.write(struct.pack('<I', len(spb)))
964  self._eif_spb.write(spb)
965  del eventPB
966  if (self._eif_entries % 1000 == 0):
967  self._eif_spb.flush()
968 
969  self._eif_entries += 1 # for this input file
970  self._eif_totentries += 1 # for all input files
971 
972  store.clearStore()
973 
974  return StatusCode.Success
975 
976  # ----------------------------------------
977  # execute at the end of processing
978  # ----------------------------------------
979  def finalize(self):
980 
981  _info = self.msg.info
982  _info("POOL2EI::finalize")
983 
984  if self._eif_spb is not None:
985  trailer = self.eipbof.Trailer()
986  trailer.nfiles = self._eif_nfiles
987  trailer.nentries = self._eif_totentries
988  trailer.endProcTime = int(time.time() * 1000)
989 
990  spb = trailer.SerializeToString()
991  self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_TRAILER << 8 |
992  eic.EI_PROTO_MSGVER))
993  self._eif_spb.write(struct.pack('<I', len(spb)))
994  self._eif_spb.write(spb)
995 
996  self._eif_spb.close()
997 
998  _info("Total number of events processed: {}".format(self._eif_totentries))
999 
1000  return StatusCode.Success
1001 
1002  pass # class POOL2EI
1003 
1004 
1006  """
1007  POOL2EI Service
1008  Registers with the Incident Service so we can deal with:
1009  - Begin of new input file processing
1010  - End of input file processing
1011  - End of the event processing loop
1012  and notify the POOL2EI algorithm accordingly
1013  """
1014 
1015  def __init__(self, name='POOL2EISvc', **kw):
1016  super(POOL2EISvc, self).__init__(name, **kw)
1017  _info = self.msg.info
1018  _info("POOL2EISvc::__init__")
1019 
1020  # whether we are inside beginFile ... endFile
1021  self.insideInputFile = False
1022 
1023  # save algorithm to call on incident
1024  if 'algo' in kw:
1025  _info("POOL2EISvc::__init__ algo: {}".format(kw['algo']))
1026  self.algo = kw['algo']
1027 
1028  def initialize(self):
1029  # register with the incident svc
1030  _info = self.msg.info
1031  _info("POOL2EISvc::initialize")
1032  incsvc = PyAthena.py_svc('IncidentSvc', iface='IIncidentSvc')
1033  if not incsvc:
1034  self.msg.error('unable to get the incident svc')
1035  return StatusCode.Failure
1036 
1037  incsvc.addListener(self, 'BeginInputFile')
1038  incsvc.addListener(self, 'EndInputFile')
1039  incsvc.addListener(self, 'EndEvtLoop')
1040  incsvc.release()
1041 
1042  return StatusCode.Success
1043 
1044  def finalize(self):
1045  _info = self.msg.info
1046  _info("POOL2EISvc::finalize")
1047  return StatusCode.Success
1048 
1049  def handle(self, incident):
1050  # process registered incidents
1051 
1052  _info = self.msg.info
1053  tp = incident.type()
1054  if tp == 'EndEvent':
1055  pass
1056  elif tp == 'BeginInputFile':
1057  _info('POOL2EISvc::handle BeginInputFile')
1058  self.insideInputFile = True
1059  self.algo.beginFile()
1060  elif tp == 'EndInputFile':
1061  _info('POOL2EISvc::handle EndInputFile')
1062  self.insideInputFile = False
1063  self.algo.endFile()
1064  elif tp == 'EndEvtLoop':
1065  _info('POOL2EISvc::handle EndEvtLoop')
1066  # when maxEvents is reached, we are still insideInputFile
1067  if self.insideInputFile:
1068  self.algo.endFile()
1069  else:
1070  _info('POOL2EISvc::handle {}. Unknown for POOL2EI'.format(tp))
1071  return
1072 
1073  pass # class POOL2EISvc
python.POOL2EI_Lib.POOL2EI.process_metadata
def process_metadata(self, store, metadata_name)
Definition: POOL2EI_Lib.py:205
vtune_athena.format
format
Definition: vtune_athena.py:14
FullCPAlgorithmsTest_eljob.flush
flush
Definition: FullCPAlgorithmsTest_eljob.py:168
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.POOL2EI_Lib.POOL2EI.guid
guid
Definition: POOL2EI_Lib.py:348
PyAthena::Alg::start
virtual StatusCode start() override
Definition: PyAthenaAlg.cxx:74
python.POOL2EI_Lib.POOL2EISvc.algo
algo
Definition: POOL2EI_Lib.py:1026
python.POOL2EI_Lib.POOL2EI.evtStore
def evtStore(self)
Definition: POOL2EI_Lib.py:196
PyAthena::Alg::initialize
virtual StatusCode initialize() override
Definition: PyAthenaAlg.cxx:60
python.EI_Lib.IOV
Definition: EI_Lib.py:10
python.POOL2EI_Lib.POOL2EI._eif_nfiles
int _eif_nfiles
Definition: POOL2EI_Lib.py:53
PyAthena::Alg::execute
virtual StatusCode execute() override
Definition: PyAthenaAlg.cxx:93
python.POOL2EI_Lib.POOL2EI
Definition: POOL2EI_Lib.py:46
python.POOL2EI_Lib.POOL2EI._eifname_spb
_eifname_spb
Definition: POOL2EI_Lib.py:55
python.POOL2EI_Lib.POOL2EI._eimagic
int _eimagic
Definition: POOL2EI_Lib.py:59
python.POOL2EI_Lib.POOL2EI.endFile
def endFile(self)
Definition: POOL2EI_Lib.py:545
PyAthena::Alg::finalize
virtual StatusCode finalize() override
Definition: PyAthenaAlg.cxx:86
python.POOL2EI_Lib.POOL2EI._dsname_orig
_dsname_orig
Definition: POOL2EI_Lib.py:117
AthCommonDataStore< AthCommonMsg< Algorithm > >::evtStore
ServiceHandle< StoreGateSvc > & evtStore()
The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:85
python.POOL2EI_Lib.POOL2EISvc.__init__
def __init__(self, name='POOL2EISvc', **kw)
Definition: POOL2EI_Lib.py:1015
python.POOL2EI_Lib.POOL2EI.inputStore
def inputStore(self)
Definition: POOL2EI_Lib.py:201
PyAthena::Svc
Definition: PyAthenaSvc.h:33
python.POOL2EI_Lib._import_ROOT
def _import_ROOT()
Definition: POOL2EI_Lib.py:33
python.POOL2EI_Lib.POOL2EI.__init__
def __init__(self, name='POOL2EI', **kw)
Definition: POOL2EI_Lib.py:61
python.ByteStreamConfig.write
def write
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:248
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.POOL2EI_Lib.POOL2EI._eif_totentries
int _eif_totentries
Definition: POOL2EI_Lib.py:52
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.POOL2EI_Lib.POOL2EI._eif_entries
int _eif_entries
Definition: POOL2EI_Lib.py:51
PyAthena::Svc::finalize
virtual StatusCode finalize() override
Definition: PyAthenaSvc.cxx:71
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
xAOD::TriggerMenu
TriggerMenu_v1 TriggerMenu
Define the latest version of the trigger menu class.
Definition: TriggerMenu.h:16
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:29
python.POOL2EI_Lib.POOL2EI.beginFile
def beginFile(self)
Definition: POOL2EI_Lib.py:299
python.POOL2EI_Lib.POOL2EI.eipbof
eipbof
Definition: POOL2EI_Lib.py:86
python.POOL2EI_Lib.toiter
def toiter(beg, end)
Definition: POOL2EI_Lib.py:39
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
python.POOL2EI_Lib.POOL2EISvc.handle
def handle(self, incident)
Definition: POOL2EI_Lib.py:1049
python.compressB64.compressB64
def compressB64(s)
Definition: compressB64.py:29
python.POOL2EI_Lib.POOL2EI.guids
guids
Definition: POOL2EI_Lib.py:346
python.POOL2EI_Lib.POOL2EI._dsname
_dsname
Definition: POOL2EI_Lib.py:88
PyAthena::Svc::initialize
virtual StatusCode initialize() override
Gaudi Service Implementation.
Definition: PyAthenaSvc.cxx:57
python.POOL2EI_Lib.POOL2EI._eiversion
int _eiversion
Definition: POOL2EI_Lib.py:58
AthCommonMsg< Algorithm >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
python.POOL2EI_Lib.POOL2EISvc
Definition: POOL2EI_Lib.py:1005
python.POOL2EI_Lib.POOL2EISvc.insideInputFile
insideInputFile
Definition: POOL2EI_Lib.py:1021
str
Definition: BTagTrackIpAccessor.cxx:11
PyAthena::Alg
Definition: PyAthenaAlg.h:33
python.POOL2EI_Lib.POOL2EI._initialized
bool _initialized
Definition: POOL2EI_Lib.py:56
python.POOL2EI_Lib.POOL2EI._eif_spb
_eif_spb
Definition: POOL2EI_Lib.py:150
error
Definition: IImpactPoint3dEstimator.h:70
python.POOL2EI_Lib.POOL2EI._iov
_iov
Definition: POOL2EI_Lib.py:49
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
PixelByteStreamErrors::Trailer
@ Trailer
Definition: PixelByteStreamErrors.h:13