ATLAS Offline Software
Loading...
Searching...
No Matches
POOL2EI_Lib.py
Go to the documentation of this file.
1# Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2
3# @file POOL2EI_Lib.py
4# @purpose provide components to get EventIndex data from pool files
5# @author Javier Sanchez
6# @date February 2014
7# @update January 2022
8#
9# Some code borrowed from PyAthena.FilePeekerLib
10# credits to Sebastien Binet
11
12
13__version__ = "$Revision: 3.0 $"
14__author__ = "Javier Sanchez"
15__doc__ = "provide components to POOL2EI"
16
17
18# imports -----------------------------------------------------------------
19import os
20import AthenaPython.PyAthena as PyAthena
21from .compressB64 import compressB64
22from .EI_Lib import IOV
23import EventIndexProducer.EIconstants as eic
24
25import struct
26from EventIndexProducer.EIPBof import EIPBof
27import gzip
28
29import time
30StatusCode = PyAthena.StatusCode
31
32
34 import ROOT
35 ROOT.gROOT.SetBatch(True)
36 return ROOT
37
38
39def toiter(beg, end):
40 while beg != end:
41 yield beg.__deref__()
42 beg.__preinc__()
43 return
44
45
47 """Algorithm
48 """
49 _iov = IOV()
50 _eif = None
51 _eif_entries = 0
52 _eif_totentries = 0
53 _eif_nfiles = 0
54 _eifname = None
55 _eifname_spb = None
56 _initialized = False
57
58 _eiversion = 0x00000001
59 _eimagic = 0x6e56c8c7
60
61 def __init__(self, name='POOL2EI', **kw):
62 # init base class
63 super(POOL2EI, self).__init__(name, **kw)
64
65 _info = self.msg.info
66 _info("POOL2EI::__init__")
67
68 # ----------------------------------------
69 # initialize
70 # ----------------------------------------
71
72 def initialize(self):
73
74 import AthenaPython.PyAthena as PyAthena
75 _info = self.msg.info
76 _info("POOL2EI::initialize")
77
78 # Am I being called for every input file ?
79 if self._initialized:
80 return
81
82 _info("## DoProvenanceRef: {}".format(self.DoProvenanceRef))
83 _info("## DoTriggerInfo: {}".format(self.DoTriggerInfo))
84 _info("## SendToBroker: {}".format(self.SendToBroker))
85
86 self.eipbof = EIPBof(self._eiversion) # initialize factory
87
88 self._dsname = "Unknown.Input.Dataset.Name" # default fake value
89 if self.EiDsName is not None:
90 _info("## EiDsName: {}".format(self.EiDsName))
91 self._dsname = self.EiDsName
92 else:
93 # try to get dataset name from pathena INDS environment variable
94 inds = os.getenv('INDS')
95 if inds is not None:
96 _info("## INDS: {}".format(inds))
97 self._dsname = inds
98 else:
99 # else, try to use job definition
100 try:
101 import newJobDef
102 processingType = newJobDef.job['processingType']
103 transformation = newJobDef.job['transformation']
104 dsSource = 'realDatasetsIn' # dataset name from input
105 if (processingType == 'merge' and
106 transformation != 'POOLtoEI_tf.py'):
107 dsSource = 'realDatasets' # dataset name from output
108 datasets = newJobDef.job[dsSource].split(',')
109 _info("## {}[0]: {}".format(dsSource, datasets[0]))
110 self._dsname = datasets[0]
111 except Exception:
112 _info('## Unable to get dataset name from realDatasetsIn '
113 'or realDatasets')
114
115 # remove _tid and _sub parts from dsname
116 import re
118 self._dsname = re.sub('_tid[0-9]{8}_[0-9]{2}', '', self._dsname)
119 self._dsname = re.sub('_sub[0-9]{10}', '', self._dsname)
120 self._dsname = re.sub('/$', '', self._dsname)
121
122 # load our pythonizations:
123 for cls_name in ('EventStreamInfo', 'EventType', 'PyEventType'):
124 cls = getattr(PyAthena, cls_name) # noqa: F841
125
126 _info("retrieving various stores...")
127 for store_name in ('evtStore', 'inputStore'):
128 _info("retrieving [{}]...".format(store_name))
129 o = getattr(self, store_name) # noqa: F841
130 _info("retrieving [{}]... [done]".format(store_name))
131 _info("retrieving various stores... [done]")
132
133 # Build output file names
134 # self.EiFmt ignored
135 # keep self.EiFmt format flag for compatibility and future usage
136 if self.Out is not None:
137 oname_spb = self.Out
138 else:
139 oname_spb = "output.ei.spb"
140
141 # open EI output file
142 oname_spb = os.path.expanduser(os.path.expandvars(oname_spb))
143 self._eifname_spb = oname_spb
144 _info('Opening EI SPB file [{}]...'.format(oname_spb))
145
146 if os.path.exists(oname_spb):
147 os.remove(oname_spb)
148
149 try:
150 self._eif_spb = gzip.open(oname_spb, 'wb')
151 except Exception:
152 self._eif_spb = None
153 self.msg.fatal("Unable to open EI SPB output file {} "
154 "exapnded as {}".format(self.Out, oname_spb))
155 raise RuntimeError("Unable to open EI SPB output file")
156
157 self._eif_spb.write(struct.pack('<I', self._eimagic)) # EI SPB magic
158 self._eif_spb.write(struct.pack('<I', self._eiversion)) # EI SPB ver
159
160 # get taskid and jobid
161 if (hasattr(self, 'TaskID') and hasattr(self, 'JobID') and
162 self.TaskID is not None and self.JobID is not None):
163 taskID = "{}.T".format(self.TaskID)
164 if (hasattr(self, 'AttemptNumber') and
165 self.AttemptNumber is not None):
166 jobID = "{}.{}".format(self.JobID, self.AttemptNumber)
167 else:
168 jobID = "{}.0".format(self.JobID)
169 else:
170 taskID = "{}.G".format(os.getenv('PanDA_TaskID', 0))
171 jobID = "{}.{}".format(os.getenv('PandaID', 0),
172 os.getenv('PanDA_AttemptNr', 0))
173
174 # initial information SPB
175 # write Header
176 if self._eif_spb is not None:
177 header = self.eipbof.Header()
178 header.startProcTime = int(time.time() * 1000)
179 header.taskID = taskID
180 header.jobID = jobID
181 header.inputDsName = self._dsname_orig
182 header.provenanceRef = self.DoProvenanceRef
183 header.triggerInfo = self.DoTriggerInfo
184
185 spb = header.SerializeToString()
186 self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_HEADER << 8 |
187 eic.EI_PROTO_MSGVER))
188 self._eif_spb.write(struct.pack('<I', len(spb)))
189 self._eif_spb.write(spb)
190
191 self._initialized = True
192
193 return StatusCode.Success
194
195 @property
196 def evtStore(self):
197 import AthenaPython.PyAthena as PyAthena
198 return PyAthena.py_svc('StoreGateSvc/StoreGateSvc')
199
200 @property
201 def inputStore(self):
202 import AthenaPython.PyAthena as PyAthena
203 return PyAthena.py_svc('StoreGateSvc/InputMetaDataStore')
204
205 def process_metadata(self, store, metadata_name):
206 msg = self.msg
207 try:
208 obj = store[metadata_name]
209 except KeyError:
210 msg.warning('could not retrieve [{}]'.format(metadata_name))
211 return ([], [])
212 try:
213 msg.info('processing container [{}]'.format(obj.folderName()))
214 except Exception:
215 msg.info('container does not have folderName')
216 data = []
217 iovs = []
218 payloads = obj.payloadContainer()
219 payloads_sz = payloads.size()
220 if hasattr(payloads, 'at'):
221 # HACK for bug #77976
222 _tmp = payloads
223 payloads = []
224 for ii in range(payloads_sz):
225 payloads.append(_tmp.at(ii))
226 pass
227 for ii, payload in zip(range(payloads_sz), payloads):
228 if not payload:
229 msg.info("**error** null-pointer ?")
230 continue
231 # names
232 chan_names = []
233 sz = payload.name_size()
234 for idx in range(sz):
235 chan = payload.chanNum(idx)
236 chan_name = payload.chanName(chan)
237 chan_names.append(chan_name)
238
239 # iovs
240 sz = payload.iov_size()
241 for idx in range(sz):
242 chan = payload.chanNum(idx)
243 iov_range = payload.iovRange(chan)
244 iov_start = iov_range.start()
245 iov_stop = iov_range.stop()
246 if (idx < 10):
247 msg.info('({}, {}) => ({}, {}) valid={} runEvt={}'.format(
248 iov_start.run(),
249 iov_start.event(),
250 iov_stop.run(),
251 iov_stop.event(),
252 iov_start.isValid(),
253 iov_start.isRunEvent()))
254 elif (idx == 10):
255 msg.info("... {} more".format(sz-9))
256 iovs.append((iov_start.run(), iov_start.event(),
257 iov_stop.run(), iov_stop.event(),
258 iov_start.isValid(), iov_start.isRunEvent(),
259 chan))
260
261 # attrs
262 attrs = [] # can't use a dict as spec.name() isn't unique
263 sz = payload.size()
264 for idx in range(sz):
265 chan = payload.chanNum(idx)
266 attr_list = payload.attributeList(chan)
267 attr_data = []
268 for a in list(toiter(attr_list.begin(), attr_list.end())):
269 spec = a.specification()
270 a_type = spec.typeName()
271 if a_type.find('string') >= 0:
272 a_data = a.data['string']()
273 try:
274 a_data = eval(a_data, {}, {})
275 except Exception:
276 # swallow and keep as a string
277 pass
278 else:
279 a_data = a.data[a_type]()
280 attr_data.append((spec.name(), a_data))
281 attrs.append(dict(attr_data))
282 if len(attrs) == len(chan_names):
283 data.append(dict(zip(chan_names, attrs)))
284 else:
285 if len(attrs):
286 if len(attrs) == 1:
287 data.append(attrs[0])
288 else:
289 data.append(attrs)
290 else:
291 data.append(chan_names)
292 pass # loop over payloads...
293
294 return (data, iovs)
295
296 # ----------------------------------------
297 # execute at begin of file
298 # ----------------------------------------
299 def beginFile(self):
300
301 _info = self.msg.info
302 _info("POOL2EI::beginFile")
303
304 # entries for this input file
305 self._eif_entries = 0
306
307 # define a new IOV storage
308 self._iov = IOV()
309
310 # ======================================================
311 # retrieve the GUID
312 # ======================================================
313
314 def _get_guid():
315 ROOT = _import_ROOT()
316 root_files = list(ROOT.gROOT.GetListOfFiles())
317 if len(root_files) == 0:
318 _info('could not find correct ROOT file')
319 return
320
321 root_file = root_files[-1] # get the last file in the list
322 pool = root_file.Get("##Params")
323 import re
324 # Pool parameters are of the form:
325 # '[NAME=somevalue][VALUE=thevalue]'
326 pool_token = re.compile(r'\[NAME=(?P<name>.*?)\]'
327 r'\[VALUE=(?P<value>.*?)\]').match
328 params = []
329 guids = []
330 for i in range(pool.GetEntries()):
331 if pool.GetEntry(i) > 0:
332 pool_string = pool.db_string.as_string()
333 # take string until \0 is found
334 n = pool_string.find('\0')
335 if n != -1:
336 pool_string = pool_string[:n]
337 match = pool_token(pool_string)
338 if not match:
339 continue
340 d = match.groupdict()
341 params.append((d['name'], d['value']))
342 if d['name'].lower() == 'fid':
343 guids.append(d['value'])
344 return guids
345
346 self.guids = _get_guid()
347 if len(self.guids) > 0:
348 self.guid = self.guids.pop()
349 else:
350 self.guid = None
351
352 # ======================================================
353 # trigger menu
354 # ======================================================
355 if self.DoTriggerInfo:
356 l1_menu = None
357 l2_menu = None
358 ef_menu = None
359 hlt_menu = None
361 _info("Load trigger menu from TriggerMenu")
362 for e in self.inputStore['TriggerMenu']:
363 smk = e.smk()
364 l1psk = e.l1psk()
365 hltpsk = e.hltpsk()
366 chainIds = e.chainIds()
367 chainNames = e.chainNames()
368 itemCtpIds = e.itemCtpIds()
369 itemNames = e.itemNames()
370 l1_menu = {name: id for id, name in
371 zip(itemCtpIds, itemNames)}
372 hlt_menu = {name: id for id, name in
373 zip(chainIds, chainNames)}
374 break # first element is enough
375
376 elif self.meta_triggermenujson_l1 or self.meta_triggermenujson_hlt:
377 _info("Load trigger menu from TriggerMenuJson_L1 and "
378 "TriggerMenuJson_HLT")
379 import json
380 for e in self.inputStore['TriggerMenuJson_L1']:
381 l1_menu_raw = json.loads(e.payload())
382 l1_items = l1_menu_raw['items']
383 l1_menu = {l1_key: l1_items[l1_key]['ctpid']
384 for l1_key in l1_items}
385 break # first element is enough
386 for e in self.inputStore['TriggerMenuJson_HLT']:
387 hlt_menu_raw = json.loads(e.payload())
388 hlt_chains = hlt_menu_raw['chains']
389 hlt_menu = {chain: hlt_chains[chain]['counter']
390 for chain in hlt_chains}
391 break # first element is enough
392 elif self.meta_hlt_menu or self.meta_lvl1_menu:
393 _info("Load trigger menu from /TRIGGER/LVL1/Menu and "
394 "/TRIGGER/HLT/Menu")
395 # /TRIGGER/LVL1/Menu
396 (l1menu_info, l1menu_iovs) = self.process_metadata(
397 self.inputStore, '/TRIGGER/LVL1/Menu')
398 # bit mask number is in channel IOV
399 channels = [iov[6] for iov in l1menu_iovs]
400 names = [e['ItemName'] for e in l1menu_info[0]]
401 l1_menu = {name: id for name, id in zip(names, channels)}
402
403 # /TRIGGER/HLT/Menu
404 (hltmenu_info, hltmenu_iovs) = self.process_metadata(
405 self.inputStore, '/TRIGGER/HLT/Menu')
406 l2_menu = {entry['ChainName']: entry['ChainCounter']
407 for entry in hltmenu_info[0]
408 if entry['ChainName'].startswith("L2_")}
409 ef_menu = {entry['ChainName']: entry['ChainCounter']
410 for entry in hltmenu_info[0]
411 if entry['ChainName'].startswith("EF_")}
412 hlt_menu = {entry['ChainName']: entry['ChainCounter']
413 for entry in hltmenu_info[0]
414 if entry['ChainName'].startswith("HLT_")}
415 else:
416 _info("Trigger Menu not found")
417
418 # ======================================================
419 # trigger config keys
420 # ======================================================
421 smk = 0
422 l1psk = 0
423 hltpsk = 0
424 if self.meta_triggermenu:
425 _info("Load trigger config keys from TriggerMenu")
426 for e in self.inputStore['TriggerMenu']:
427 smk = e.smk()
428 l1psk = e.l1psk()
429 hltpsk = e.hltpsk()
430 break # first element is enough
432 _info("Load trigger config keys from /TRIGGER/HLT/HltConfigKeys")
433 # /TRIGGER/HLT/HltConfigKeys
434 (hltck_info, hltck_iovs) = self.process_metadata(
435 self.inputStore, '/TRIGGER/HLT/HltConfigKeys')
436 smk_l = [x['MasterConfigurationKey'] for x in hltck_info]
437 for val, iov in zip(smk_l, hltck_iovs):
438 self._iov.add('SMK', val, iov[:4])
439 smk = smk_l[0]
441 _info("Load trigger config keys from /TRIGGER/HLT/PrescaleKey")
442 # /TRIGGER/HLT/PrescaleKey
443 (hltpk_info, hltpk_iovs) = self.process_metadata(
444 self.inputStore, '/TRIGGER/HLT/PrescaleKey')
445 hltpk_l = [x['HltPrescaleKey'] for x in hltpk_info]
446 for val, iov in zip(hltpk_l, hltpk_iovs):
447 self._iov.add('HLTPSK', val, iov[:4])
448 hltpsk = hltpk_l[0]
450 _info("Load trigger config keys from /TRIGGER/LVL1/Lvl1ConfigKey")
451 # /TRIGGER/LVL1/Lvl1ConfigKey
452 (l1pk_info, l1pk_iovs) = self.process_metadata(
453 self.inputStore, '/TRIGGER/LVL1/Lvl1ConfigKey')
454 l1pk_l = [x['Lvl1PrescaleConfigurationKey'] for x in l1pk_info]
455 for val, iov in zip(l1pk_l, l1pk_iovs):
456 self._iov.add('L1PSK', val, iov[:4])
457 l1psk = l1pk_l[0]
458
459 # ======================================================
460 # AMITag, triggerStreamOfFile and project_name
461 # ======================================================
462 _info("Load AMITag, triggerStreamOfFile and project_name "
463 "from /TagInfo")
464
465 (tginfo, tgiovs) = self.process_metadata(self.inputStore, '/TagInfo')
466 amitag = "Unknown"
467 trigStream = "Unknown"
468 projName = "Unknown"
469 if len(tginfo) > 0:
470 for tgi in tginfo:
471 if 'AMITag' in tgi:
472 amitag = tgi['AMITag']
473 _info("## AMITag: {}".format(amitag))
474 if 'triggerStreamOfFile' in tgi:
475 trigStream = tgi['triggerStreamOfFile']
476 _info("## triggerStreamOfFile: {}".format(trigStream))
477 if 'project_name' in tgi:
478 projName = tgi['project_name']
479 _info("## project_name: {}".format(projName))
480
481 # ======================================================
482 # write BeginGUID
483 # ======================================================
484
485 if self._eif_spb is not None:
486 beginGUID = self.eipbof.BeginGUID()
487 beginGUID.startProcTime = int(time.time() * 1000)
488 beginGUID.AMITag = str(amitag)
489 beginGUID.trigStream = str(trigStream)
490 beginGUID.projName = str(projName)
491 beginGUID.guid = self.guid
492
493 spb = beginGUID.SerializeToString()
494 self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_BEGINGUID << 8 |
495 eic.EI_PROTO_MSGVER))
496 self._eif_spb.write(struct.pack('<I', len(spb)))
497 self._eif_spb.write(spb)
498
499 self._eif_nfiles += 1
500
501 # ======================================================
502 # write TriggerMenu
503 # ======================================================
504
505 if (self.DoTriggerInfo and self._eif_spb is not None
506 and l1_menu is not None):
507 _info("Write trigger menu to output SPB")
508 tMenu = self.eipbof.TriggerMenu()
509 tMenu.SMK = smk
510 tMenu.L1PSK = l1psk
511 tMenu.HLTPSK = hltpsk
512
513 def d2l(d):
514 menu = ["{}:{}".format(d[k], k) for k in d]
515 return ";".join(menu)
516
517 if l1_menu is not None and len(l1_menu) > 0:
518 tMenu.L1Menu = d2l(l1_menu)
519 if l2_menu is not None and len(l2_menu) > 0:
520 tMenu.L2Menu = d2l(l2_menu)
521 if ef_menu is not None and len(ef_menu) > 0:
522 tMenu.EFMenu = d2l(ef_menu)
523 if hlt_menu is not None and len(hlt_menu) > 0:
524 tMenu.HLTMenu = d2l(hlt_menu)
525
526 # tMenu.L1Menu is required
527 if len(tMenu.L1Menu) > 0:
528 spb = tMenu.SerializeToString()
529 self._eif_spb.write(struct.pack('<I',
530 eic.EI_PROTO_TRIGGERMENU << 8 |
531 eic.EI_PROTO_MSGVER))
532 self._eif_spb.write(struct.pack('<I', len(spb)))
533 self._eif_spb.write(spb)
534 else:
535 _info("Unable to write trigger menu to output SPB. "
536 "tMenu.L1Menu is empty")
537
538 self.inputStore.clearStore()
539
540 return
541
542 # ----------------------------------------
543 # execute at end of file
544 # ----------------------------------------
545 def endFile(self):
546
547 _info = self.msg.info
548 _info("POOL2EI::endFile")
549
550 # write EndGUID
551 if self._eif_spb is not None:
552 endGUID = self.eipbof.EndGUID()
553 endGUID.nentries = self._eif_entries
554 endGUID.endProcTime = int(time.time() * 1000)
555
556 spb = endGUID.SerializeToString()
557 self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_ENDGUID << 8 |
558 eic.EI_PROTO_MSGVER))
559 self._eif_spb.write(struct.pack('<I', len(spb)))
560 self._eif_spb.write(spb)
561
562 self.inputStore.clearStore()
563
564 return
565
566 # ----------------------------------------
567 # execute at start of run
568 # ----------------------------------------
569 def start(self):
570
571 _info = self.msg.info
572 _info("POOL2EI::start")
573
574 return StatusCode.Success
575
576 # ----------------------------------------
577 # execute event by event
578 # ----------------------------------------
579 def execute(self):
580
581 if self._eif_totentries < 100:
582 _info = self.msg.info
583 else:
584 _info = lambda *x: None # noqa: E731
585 _warning = self.msg.warning
586
587 _info("POOL2EI::execute")
588
589 if self._eif_spb is not None:
590 eventPB = self.eipbof.EIEvent()
591
592 # ======================================================
593 # Get EventInfo data
594 # ======================================================
595
596 store = self.evtStore
597
599
600 _info('=== [xAOD::EventInfo] ===')
601 xei = store.retrieve('xAOD::EventInfo', 'EventInfo')
602 run_number = xei.runNumber()
603 event_number = xei.eventNumber()
604 lumi_block = xei.lumiBlock()
605 bunch_crossing_id = xei.bcid()
606 time_stamp = xei.timeStamp()
607 try:
608 time_stamp_ns = xei.timeStampNSOffset()
609 except Exception:
610 _info('## Event does not have xAOD::EventInfo::'
611 'timeStampNSOffset()')
612 time_stamp_ns = 0
613 evt_type_bit_mask = xei.eventTypeBitmask()
614 isSimulation = (evt_type_bit_mask & xei.IS_SIMULATION != 0)
615 isTestBeam = (evt_type_bit_mask & xei.IS_TESTBEAM != 0)
616 isCalibration = (evt_type_bit_mask & xei.IS_CALIBRATION != 0)
617 mc_channel_number = 0
618 mc_event_weight = 0.
619 if isSimulation:
620 try:
621 mc_channel_number = xei.mcChannelNumber()
622 mc_event_weight = xei.mcEventWeight()
623 except Exception:
624 pass
625 try:
626 extendedLevel1ID = xei.extendedLevel1ID()
627 except Exception:
628 _info('## Event does not have xAOD::EventInfo::'
629 'extendedLevel1ID()')
630 extendedLevel1ID = 0
631 # there isn't eInfoTrigger in xAOD::EventInfo
632 eInfoTrigger = None
633
634 del xei
635
636 elif self.item_eventinfo:
637
638 # evt_info_keys = store.keys('EventInfo') # fails. bug ?
639 evt_info_keys = [
640 x for x in store.keys() if x.endswith("EventInfo")]
641 if len(evt_info_keys) != 1:
642 _info('more than one EventInfo: {}'.format(evt_info_keys))
643 _info(' ==> we\'ll use [{}]'.format(evt_info_keys[0]))
644 sg_key = evt_info_keys[0]
645 ei = store.retrieve('EventInfo', sg_key)
646 _info('=== [EventInfo#{}] ==='.format(sg_key))
647 eid = ei.event_ID()
648 run_number = eid.run_number()
649 event_number = eid.event_number()
650 lumi_block = eid.lumi_block()
651 bunch_crossing_id = eid.bunch_crossing_id()
652 time_stamp = eid.time_stamp()
653 time_stamp_ns = eid.time_stamp_ns_offset()
654 eitype = ei.event_type()
655 mc_channel_number = eitype.mc_channel_number()
656 mc_event_weight = eitype.mc_event_weight()
657 bm = list(eitype.bit_mask)
658 # IS_SIMULATION / IS_DATA
659 isSimulation = True if 'IS_SIMULATION' in bm else False
660 # IS_TESTBEAM / IS_FROM_ATLAS_DET
661 isTestBeam = True if 'IS_TESTBEAM' in bm else False
662 # IS_CALIBRATION / IS_PHYSICS
663 isCalibration = True if 'IS_CALIBRATION' in bm else False
664 extendedLevel1ID = 0
665 eInfoTrigger = ei.trigger_info()
666 extendedLevel1ID = eInfoTrigger.extendedLevel1ID()
667
668 del ei
669
670 else:
671 # FAIL
672 raise RuntimeError('Unable to find neither '
673 'xAOD::EventInfo nor EventInfo')
674
675 _info('## run_number: {:d}'.format(run_number))
676 _info('## event_number: {:d}'.format(event_number))
677 _info('## bunch_crossing_id: {:d}'.format(bunch_crossing_id))
678 _info('## extendedLevel1ID: {:d}'.format(extendedLevel1ID))
679 _info('## lumi_block: {:d}'.format(lumi_block))
680 _info('## time_stamp: {:d}'.format(time_stamp))
681 _info('## time_stamp_ns_offset: {:d}'.format(time_stamp_ns))
682 _info('## EventWeight: {:f}'.format(mc_event_weight))
683 _info('## McChannelNumber: {:d}'.format(mc_channel_number))
684 _info('## isSimulation: {}'.format(isSimulation))
685 _info('## isTestBeam: {}'.format(isTestBeam))
686 _info('## isCalibration: {}'.format(isCalibration))
687
688 if self._eif_spb is not None:
689 eventPB.runNumber = run_number
690 eventPB.eventNumber = event_number
691 eventPB.lumiBlock = lumi_block
692 eventPB.bcid = bunch_crossing_id
693 eventPB.timeStamp = time_stamp
694 eventPB.timeStampNSOffset = time_stamp_ns
695 eventPB.mcEventWeight = mc_event_weight
696 eventPB.mcChannelNumber = mc_channel_number
697 eventPB.isSimulation = isSimulation
698 eventPB.isCalibration = isCalibration
699 eventPB.isTestBeam = isTestBeam
700 eventPB.extendedLevel1ID = extendedLevel1ID
701
702 # ======================================================
703 # Trigger conf keys
704 # ======================================================
705
707 _info("Retrieve TrigConfKeys from xAOD::TrigConfKeys")
708 tck = store.retrieve('xAOD::TrigConfKeys', 'TrigConfKeys')
709 evt_smk = tck.smk()
710 evt_l1psk = tck.l1psk()
711 evt_hltpsk = tck.hltpsk()
712 del tck
715 _info("Retrieve TrigConfKeys from /TRIGGER/**")
716 evt_smk = self._iov.get('SMK', (run_number, event_number))
717 evt_l1psk = self._iov.get('L1PSK', (run_number, lumi_block))
718 evt_hltpsk = self._iov.get('HLTPSK', (run_number, lumi_block))
719 else:
720 _info("Unable to retrieve TrigConfKeys")
721 evt_smk = 0
722 evt_l1psk = 0
723 evt_hltpsk = 0
724
725 _info('## smk: {}'.format(evt_smk))
726 _info('## hltpsk: {}'.format(evt_hltpsk))
727 _info('## l1psk: {}'.format(evt_l1psk))
728
729 if self._eif_spb is not None:
730 eventPB.SMK = evt_smk
731 eventPB.HLTPSK = evt_hltpsk
732 eventPB.L1PSK = evt_l1psk
733
734 # ======================================================
735 # Trigger bit masks
736 # ======================================================
737
738 if self.DoTriggerInfo:
739
740 def v2b(v): # vector elements to binary string
741 res = ""
742 for e in v:
743 res += "{0:032b}".format(e)[::-1]
744 return res
745
746 trigL1 = ""
747 trigL2 = ""
748 trigEF = ""
749
751
752 _info("Get trigger bit masks form xAOD::TrigDecision")
753
754 # retrieve xTrigDecision
755 xtd = store.retrieve('xAOD::TrigDecision', 'xTrigDecision')
756
757 # L1
758 tbp = xtd.tbp() # vector: 16 elements of 32 bits = 512 bits
759 tap = xtd.tap()
760 tav = xtd.tav()
761 trigL1 = compressB64(v2b(tbp) + v2b(tap) + v2b(tav))
762 del tbp
763 del tap
764 del tav
765
766 # L2
767 trigL2_PH = xtd.lvl2PassedPhysics() # 256*32 bits = 8192 bits
768 trigL2_PT = xtd.lvl2PassedThrough()
769 trigL2_RS = xtd.lvl2Resurrected()
770 trigL2 = "{};{};{}".format(
771 compressB64(v2b(trigL2_PH)),
772 compressB64(v2b(trigL2_PT)),
773 compressB64(v2b(trigL2_RS)))
774 del trigL2_PH
775 del trigL2_PT
776 del trigL2_RS
777
778 # EF
779 trigEF_PH = xtd.efPassedPhysics() # 256*32 bits = 8192 bits
780 trigEF_PT = xtd.efPassedThrough()
781 trigEF_RS = xtd.efResurrected()
782 trigEF = "{};{};{}".format(
783 compressB64(v2b(trigEF_PH)),
784 compressB64(v2b(trigEF_PT)),
785 compressB64(v2b(trigEF_RS)))
786 del trigEF_PH
787 del trigEF_PT
788 del trigEF_RS
789
790 del xtd
791
792 else:
793
794 _info("Get trigger bit masks form eventInfo.trigger_info()")
795
796 # get trigger from eventInfo.trigger_info()
797 if eInfoTrigger is not None:
798 trigL1 = compressB64(v2b(eInfoTrigger.level1TriggerInfo()))
799 trigL2 = compressB64(v2b(eInfoTrigger.level2TriggerInfo()))
800 trigEF = compressB64(v2b(eInfoTrigger.eventFilterInfo()))
801
802 _info("## trigL1: {}".format(trigL1))
803 _info("## trigL2: {}".format(trigL2))
804 _info("## trigEF: {}".format(trigEF))
805
806 if self._eif_spb is not None:
807
808 eventPB.L1PassedTrigMask = trigL1
809 eventPB.L2PassedTrigMask = trigL2
810 eventPB.EFPassedTrigMask = trigEF
811
812 if eInfoTrigger is not None:
813 del eInfoTrigger
814
815 # ======================================================
816 # Self reference and Provenance
817 # ======================================================
818
819 def guid2string(guid):
820 # get guid as string
821 # native method toString() seems to produce a memory leak
822 # use getters for class Guid
823 # data1(), data2(), data3() are translated to integers in python
824 # data4(i) is translated from char in C++ to string in python
825 s="{:08X}-{:04X}-{:04X}-{:02X}{:02X}-{:02X}{:02X}{:02X}{:02X}{:02X}{:02X}".format(
826 guid.data1(), guid.data2(), guid.data3(),
827 ord(guid.data4(0)[0]), ord(guid.data4(1)[0]),
828 ord(guid.data4(2)[0]), ord(guid.data4(3)[0]),
829 ord(guid.data4(4)[0]), ord(guid.data4(5)[0]),
830 ord(guid.data4(6)[0]), ord(guid.data4(7)[0]))
831 return s
832
833 def token2string (tk, replace_empty_cntID=False):
834 # get token as string
835 # native method toString() seems to produce a memory leak
836 # use getter for class Token
837 cntID = tk.contID()
838 if replace_empty_cntID and cntID == "":
839 cntID = "POOLContainer(DataHeader)"
840 stk = "[DB={}][CNT={}][CLID={}][TECH={:08X}][OID={:016X}-{:016X}]".format(
841 guid2string(tk.dbID()), cntID, guid2string(tk.classID()),
842 tk.technology(), tk.oid().first, tk.oid().second)
843 return stk
844
845 Pstream_refs = {} # provenance references
846 procTag = None
847
848 # -- Stream references
849 dh = store.retrieve('DataHeader', 'EventSelector')
850 procTag = dh.getProcessTag()
851
852 if procTag == "":
853 # pacth procTag ATEAM-782
854 for el in dh.elements():
855 if el.getPrimaryClassID() == 222376821:
856 procTag = el.getKey()
857 break
858 _info("## ProcessTag: " + procTag)
859
860 if self.DoProvenanceRef:
861
862 # get provenance references
863 if dh.sizeProvenance() > 0:
864 prv = dh.beginProvenance()
865 for i in range(dh.sizeProvenance()):
866 key = prv.getKey()
867 tk = prv.getToken()
868 if key.startswith("Output"):
869 _warning('Provenance token starts with Output: {}'
870 .format(key))
871 key = key[6:]
872 if key.startswith("Input"):
873 _warning('Provenance token starts with Input: {}'
874 .format(key))
875 key = key[5:]
876 # CNT might be empty. Complete information
877 if key == "StreamRAW":
878 stk = token2string(tk, replace_empty_cntID=False)
879 elif key in ("StreamAOD", "StreamESD", "StreamRDO",
880 "StreamHITS", "StreamEVGEN",
881 "EmbeddingStream"):
882 stk = token2string(tk, replace_empty_cntID=True)
883 else:
884 stk = token2string(tk, replace_empty_cntID=False)
885 _info("provenance {}={}".format(key, stk))
886 _info('Unknown provenance stream: {}'.format(key))
887 # do not raise error, just continue. mar-2024
888 # _error('Unknown provenance stream: {}'.format(key))
889 # raise RuntimeError('Unknown provenance stream')
890 del tk
891 del key
892 prv += 1
893 continue
894 _info("## P" + key + "_ref: " + stk)
895 if key not in Pstream_refs:
896 # keep only the first provenance found for each straam
897 Pstream_refs[key] = stk
898 del tk
899 del key
900 prv += 1
901 del prv
902
903
904 # get self reference.
905 # look for the processing tag key in the Data Object vector
906 if self._eif_spb is not None:
907 tokenPB0 = eventPB.eitoken.add()
908 if dh.size() > 0:
909 dhe = dh.begin()
910 for i in range(dh.size()):
911 key = dhe.getKey()
912 if key.startswith('Stream'):
913 _info("## Stream: " + key)
914 if key in [procTag, 'StreamAOD']:
915 tk = dhe.getToken()
916 stk = token2string(tk, replace_empty_cntID=True)
917 _info("## " + key + "_ref: " + stk)
918 if self._eif_spb is not None:
919 if key == tokenPB0.name:
920 _info("Already inserted key {0} in tokenPB0 "
921 "with value {1}".format(key, stk))
922 tokenPB0.name = key
923 tokenPB0.token = stk
924 del tk
925 del key
926 dhe += 1
927 dh.end()
928 del dhe
929
930 # Update self reference token to handle fast merged files.
931 try:
932 stk = store.proxy(dh).address().par().c_str()
933 if self._eif_spb is not None:
934 tokenPB0.token = stk
935 _info("Updated ref token " + stk)
936 del stk
937 except Exception:
938 pass
939
940 # write provenance to protbuf message
941 if self._eif_spb is not None:
942 for sr in Pstream_refs:
943 try:
944 tokenPB = eventPB.eitoken.add()
945 tokenPB.name = sr
946 tokenPB.token = Pstream_refs[sr]
947 except Exception:
948 _info("Unable to insert {} in provenance stream "
949 "references with value {}".format(
950 sr, Pstream_refs[sr]))
951 pass
952
953 del dh
954
955 # ======================================================
956 # Write event EI
957 # ======================================================
958
959 if self._eif_spb is not None:
960 spb = eventPB.SerializeToString()
961 self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_EIEVENT << 8 |
962 eic.EI_PROTO_MSGVER))
963 self._eif_spb.write(struct.pack('<I', len(spb)))
964 self._eif_spb.write(spb)
965 del eventPB
966 if (self._eif_entries % 1000 == 0):
967 self._eif_spb.flush()
968
969 self._eif_entries += 1 # for this input file
970 self._eif_totentries += 1 # for all input files
971
972 store.clearStore()
973
974 return StatusCode.Success
975
976 # ----------------------------------------
977 # execute at the end of processing
978 # ----------------------------------------
979 def finalize(self):
980
981 _info = self.msg.info
982 _info("POOL2EI::finalize")
983
984 if self._eif_spb is not None:
985 trailer = self.eipbof.Trailer()
986 trailer.nfiles = self._eif_nfiles
987 trailer.nentries = self._eif_totentries
988 trailer.endProcTime = int(time.time() * 1000)
989
990 spb = trailer.SerializeToString()
991 self._eif_spb.write(struct.pack('<I', eic.EI_PROTO_TRAILER << 8 |
992 eic.EI_PROTO_MSGVER))
993 self._eif_spb.write(struct.pack('<I', len(spb)))
994 self._eif_spb.write(spb)
995
996 self._eif_spb.close()
997
998 _info("Total number of events processed: {}".format(self._eif_totentries))
999
1000 return StatusCode.Success
1001
1002 pass # class POOL2EI
1003
1004
1006 """
1007 POOL2EI Service
1008 Registers with the Incident Service so we can deal with:
1009 - Begin of new input file processing
1010 - End of input file processing
1011 - End of the event processing loop
1012 and notify the POOL2EI algorithm accordingly
1013 """
1014
1015 def __init__(self, name='POOL2EISvc', **kw):
1016 super(POOL2EISvc, self).__init__(name, **kw)
1017 _info = self.msg.info
1018 _info("POOL2EISvc::__init__")
1019
1020 # whether we are inside beginFile ... endFile
1021 self.insideInputFile = False
1022
1023 # save algorithm to call on incident
1024 if 'algo' in kw:
1025 _info("POOL2EISvc::__init__ algo: {}".format(kw['algo']))
1026 self.algo = kw['algo']
1027
1028 def initialize(self):
1029 # register with the incident svc
1030 _info = self.msg.info
1031 _info("POOL2EISvc::initialize")
1032 incsvc = PyAthena.py_svc('IncidentSvc', iface='IIncidentSvc')
1033 if not incsvc:
1034 self.msg.error('unable to get the incident svc')
1035 return StatusCode.Failure
1036
1037 incsvc.addListener(self, 'BeginInputFile')
1038 incsvc.addListener(self, 'EndInputFile')
1039 incsvc.addListener(self, 'EndEvtLoop')
1040 incsvc.release()
1041
1042 return StatusCode.Success
1043
1044 def finalize(self):
1045 _info = self.msg.info
1046 _info("POOL2EISvc::finalize")
1047 return StatusCode.Success
1048
1049 def handle(self, incident):
1050 # process registered incidents
1051
1052 _info = self.msg.info
1053 tp = incident.type()
1054 if tp == 'EndEvent':
1055 pass
1056 elif tp == 'BeginInputFile':
1057 _info('POOL2EISvc::handle BeginInputFile')
1058 self.insideInputFile = True
1059 self.algo.beginFile()
1060 elif tp == 'EndInputFile':
1061 _info('POOL2EISvc::handle EndInputFile')
1062 self.insideInputFile = False
1063 self.algo.endFile()
1064 elif tp == 'EndEvtLoop':
1065 _info('POOL2EISvc::handle EndEvtLoop')
1066 # when maxEvents is reached, we are still insideInputFile
1067 if self.insideInputFile:
1068 self.algo.endFile()
1069 else:
1070 _info('POOL2EISvc::handle {}. Unknown for POOL2EI'.format(tp))
1071 return
1072
1073 pass # class POOL2EISvc
MsgStream & msg() const
virtual StatusCode start() override
virtual StatusCode execute() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
Gaudi Service Implementation.
__init__(self, name='POOL2EISvc', **kw)
process_metadata(self, store, metadata_name)
__init__(self, name='POOL2EI', **kw)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177