8 __author__ =
"Sebastien Binet"
9 __doc__ =
"provide components to peek into pool files"
12 import AthenaPython.PyAthena
as PyAthena
13 StatusCode = PyAthena.StatusCode
21 ROOT.gROOT.SetBatch(
True)
32 """simple helper function to create consistent dicts for in-file metadata
41 'mc_channel_number': [],
43 'beam_energy': [
'N/A'],
46 'eventdata_items': [],
49 'conditions_tag':
None,
50 'det_descr_tags':
None,
67 """utility algorithm to inspect a file's content
71 super(FilePeeker, self).
__init__(name, **kw)
72 self.
infname = kw.get(
'infname',
'not-there.pool')
86 import AthenaPython.PyAthena
as PyAthena
90 for cls_name
in (
'EventStreamInfo',
94 cls = getattr(PyAthena, cls_name)
97 _info(
"retrieving various stores...")
98 for store_name
in (
'evtStore',
'inputStore',
99 'tagStore',
'metaStore',):
100 _info(
'retrieving [%s]...', store_name)
101 o = getattr(self, store_name)
102 _info(
'retrieving [%s]... [done]', store_name)
108 _info(
"retrieving various stores... [done]")
113 import AthenaPython.PyAthena
as PyAthena
114 _info = self.
msg.info
115 return StatusCode.Success
122 return StatusCode.Success
130 return StatusCode.Success
134 import AthenaPython.PyAthena
as PyAthena
135 return PyAthena.py_svc(
'StoreGateSvc/StoreGateSvc')
139 import AthenaPython.PyAthena
as PyAthena
140 return PyAthena.py_svc(
'StoreGateSvc/MetaDataStore')
144 import AthenaPython.PyAthena
as PyAthena
145 return PyAthena.py_svc(
'StoreGateSvc/TagMetaDataStore')
149 import AthenaPython.PyAthena
as PyAthena
150 return PyAthena.py_svc(
'StoreGateSvc/InputMetaDataStore')
156 return StatusCode.Success
161 obj = store[metadata_name]
163 msg.warning(
'could not retrieve [%s]', metadata_name)
165 if str(obj).
find(
'MetaCont') >= 0:
166 obj = obj.get (obj.sources()[0])
167 msg.info(
'processing container [%s]', obj.folderName())
169 payloads = obj.payloadContainer()
170 payloads_sz = payloads.size()
171 if hasattr(payloads,
'at'):
175 for ii
in range(payloads_sz):
176 payloads.append(_tmp.at(ii))
178 for ii,payload
in zip(
range(payloads_sz), payloads):
181 msg.info (
"**error** null-pointer ?")
185 sz = payload.name_size()
186 msg.info(
'==names== (sz: %s)', sz)
187 for idx
in range(sz):
188 chan = payload.chanNum(idx)
189 chan_name = payload.chanName(chan)
191 chan_names.append(chan_name)
195 sz = payload.iov_size()
196 msg.info(
'==iovs== (sz: %s)',sz)
197 for idx
in range(sz):
198 chan = payload.chanNum(idx)
199 iov_range = payload.iovRange(chan)
200 iov_start = iov_range.start()
201 iov_stop = iov_range.stop()
203 msg.info(
'(%s, %s) => (%s, %s) valid=%s runEvt=%s',
209 iov_start.isRunEvent())
214 msg.info(
'==attrs== (sz: %s)', sz)
215 for idx
in range(sz):
216 chan = payload.chanNum(idx)
218 attr_list = payload.attributeList(chan)
220 for a
in list(
toiter(attr_list.begin(), attr_list.end())):
222 spec = a.specification()
223 a_type = spec.typeName()
224 if a_type.find(
'string') >= 0:
225 a_data = a.data[
'string']()
227 a_data = eval(a_data,{},{})
232 a_data = a.data[a_type]()
234 attr_data.append( (spec.name(), a_data) )
235 attrs.append(dict(attr_data))
237 if len(attrs) == len(chan_names):
238 data.append(dict(zip(chan_names,attrs)))
242 data.append(attrs[0])
246 data.append(chan_names)
257 _info = self.
msg.info
262 oname = os.path.expanduser(os.path.expandvars(oname))
263 _info(
'storing peeked file infos into [%s]...', oname)
264 if os.path.exists(oname):
267 import PyUtils.dbsqlite
as dbsqlite
268 db = dbsqlite.open(oname,flags=
'w')
274 for k
in (
'run_number',
282 if isinstance(v, list)
and len(v)>0:
287 db[
'fileinfos'] = peeked_data
289 _info(
'storing peeked file infos into [%s]... [done]', oname)
291 return StatusCode.Success
296 _info = self.
msg.info
299 _info(
':::::: summary ::::::')
300 _info(
' - nbr events: %s', data[
'nentries'])
301 _info(
' - run numbers: %s', data[
'run_number'])
302 _info(
' - evt numbers: %s', data[
'evt_number'])
303 _info(
' - lumiblocks: %s', data[
'lumi_block'])
304 _info(
' - evt types: %s', data[
'evt_type'])
305 _info(
' - item list: %s', len(data[
'eventdata_items']))
306 _info(
' - stream names: %s', data[
'stream_names'])
307 _info(
' - stream tags: %s', data[
'stream_tags'])
308 _info(
' - geometry: %s', data[
'geometry'])
309 _info(
' - conditions tag: %s', data[
'conditions_tag'])
310 _info(
' - metadata items: %s', len(data[
'metadata_items']))
311 _info(
' - tag-info: %s', data[
'tag_info'].
keys())
315 """ the real function doing all the work of peeking at the input file
316 @return a dict of peeked-at data
319 import AthenaPython.PyAthena
as PyAthena
320 _info = self.
msg.info
321 _error= self.
msg.error
323 def _get_detdescr_tags(evt_type):
324 ddt = evt_type.get_detdescr_tags().
split()
327 ddt = dict(zip(ddt[0::2],
334 esi_keys = store.keys(
'EventStreamInfo')
337 if len(esi_keys) >= 1:
338 sg_key = esi_keys[-1]
340 stream_names = esi_keys[:]
341 for sg_key
in esi_keys:
342 esi = store.retrieve(
'EventStreamInfo', sg_key)
343 _info(
'=== [EventStreamInfo#%s] ===', sg_key)
344 nentries += esi.getNumberOfEvents()
346 evt_types = PyAthena.EventStreamInfo.evt_types(esi)
347 if len(evt_types) > 0:
348 evt_type = evt_types[0]
349 peeked_data[
'evt_type'] = evt_type.bit_mask
350 ddt = _get_detdescr_tags(evt_type)
351 peeked_data[
'det_descr_tags'] = ddt
352 from past.builtins
import long
353 peeked_data[
'mc_channel_number'] = [long(evt_type.mc_channel_number())]
355 def _make_item_list(item):
358 _typename = store._pyclidsvc.typename
361 item_list = esi.item_list()
362 item_list =
list(map(_make_item_list, item_list))
363 peeked_data[
'eventdata_items'] = item_list
365 peeked_data[
'lumi_block'] = esi.lumi_blocks()
366 peeked_data[
'run_number'] = esi.run_numbers()
368 peeked_data[
'stream_names'] = esi.processing_tags()
370 if not peeked_data[
'stream_names']:
372 peeked_data[
'stream_names'] =
list(
set(stream_names))
376 if len(esi.run_numbers()) == 0:
377 bsmd_keys = store.keys(
"ByteStreamMetadataContainer")
378 if len(bsmd_keys) == 1:
379 bsmd = store[bsmd_keys[0]][0]
380 peeked_data[
'lumi_block'] = [bsmd.getLumiBlock()]
381 peeked_data[
'run_number'] = [bsmd.getRunNumber()]
383 for md
in bsmd.getFreeMetaDataStrings():
384 if md.startswith(
'Event type:'):
387 if 'is sim' in md: v.append(
'IS_SIMULATION')
388 else: v.append(
'IS_DATA')
389 if 'is atlas' in md: v.append(
'IS_ATLAS')
390 else: v.append(
'IS_TESTBEAM')
391 if 'is physics' in md: v.append(
'IS_PHYSICS')
392 else: v.append(
'IS_CALIBRATION')
393 bs_metadata[k] = tuple(v)
394 elif md.startswith(
'GeoAtlas:'):
396 v = md.split(
'GeoAtlas:')[1].strip()
398 elif md.startswith(
'IOVDbGlobalTag:'):
400 v = md.split(
'IOVDbGlobalTag:')[1].strip()
406 peeked_data[
'evt_type'] = bs_metadata.get(
'evt_type', [])
407 peeked_data[
'geometry'] = bs_metadata.get(
'geometry',
None)
408 peeked_data[
'conditions_tag'] = bs_metadata.get(
'conditions_tag',
None)
409 peeked_data[
'bs_metadata'] = bs_metadata
417 root_files =
list(ROOT.gROOT.GetListOfFiles())
418 root_files = [root_file
for root_file
in root_files
420 if len(root_files)==1:
421 root_file = root_files[0]
422 data_hdr = root_file.Get(
"POOLContainer")
424 data_hdr = root_file.Get(
"POOLContainer_DataHeader")
425 nentries = data_hdr.GetEntriesFast()
if bool(data_hdr) \
428 _info(
'could not find correct ROOT file (looking for [%s])',
433 peeked_data[
'nentries'] = nentries
439 root_files =
list(ROOT.gROOT.GetListOfFiles())
440 root_files = [root_file
for root_file
in root_files
442 if len(root_files)==0:
443 _info(
'could not find correct ROOT file (looking for [%s])',
447 root_file = root_files[0]
448 pool = root_file.Get(
"##Params")
452 pool_token = re.compile(
r'[]NAME=(?P<name>.*?)[
]'
453 r'[]VALUE=(?P<value>.*?)[
]').match
455 for i
in range(pool.GetEntries()):
456 if pool.GetEntry(i)>0:
457 match = pool_token(pool.db_string)
460 d = match.groupdict()
461 params.append((d[
'name'], d[
'value']))
462 if d[
'name'].lower() ==
'fid':
467 peeked_data[
'file_guid'] = guid
470 metadata_items = [(self.
inputStore._pyclidsvc.typename(p.clID()),
473 peeked_data[
'metadata_items'] = metadata_items
475 def maybe_get(o, idx, default=None):
486 def mergeMultipleDict(inDicts):
489 for k,o
in six.iteritems(d):
501 metadata[k] = maybe_get(v, -1)
502 peeked_data[
'metadata'] = metadata
508 taginfo = mergeMultipleDict(v)
510 if '/TagInfo' in metadata:
511 taginfo = metadata[
'/TagInfo'].
copy()
517 peeked_data[
'det_descr_tags'] = ddt
518 peeked_data[
'geometry'] = ddt.get(
'GeoAtlas',
None)
519 peeked_data[
'conditions_tag'] = ddt.get(
'IOVDbGlobalTag',
None)
520 peeked_data[
'tag_info'] = taginfo
522 peeked_data[
'det_descr_tags'] = taginfo
523 peeked_data[
'geometry'] = taginfo.get(
'GeoAtlas',
None)
524 peeked_data[
'conditions_tag'] = taginfo.get(
'IOVDbGlobalTag',
None)
525 peeked_data[
'beam_type'] = [taginfo.get(
'beam_type',
'N/A')]
526 peeked_data[
'beam_energy']= [maybe_float(taginfo.get(
'beam_energy',
529 if 'geometry' not in peeked_data:
530 peeked_data[
'geometry'] =
None
531 if 'conditions_tag' not in peeked_data:
532 peeked_data[
'conditions_tag'] =
None
533 if 'det_descr_tags' not in peeked_data:
534 peeked_data[
'det_descr_tags'] = {}
538 if taginfo
and taginfo.get(
'project_name',
None) ==
'IS_SIMULATION':
539 peeked_data[
'evt_type'] = (
'IS_SIMULATION',
'IS_ATLAS',
'IS_PHYSICS')
540 if '/Simulation/Parameters' in metadata:
541 peeked_data[
'run_number'] = [metadata.get(
'/Simulation/Parameters').
get(
'RunNumber',
None)]
544 if peek_evt_data
is False:
551 evt_info_keys = store.keys(
'EventInfo')
552 if len(evt_info_keys) != 1:
553 _info(
'more than one EventInfo: %s', evt_info_keys)
554 _info(
' ==> we\'ll use [%s]', evt_info_keys[0])
555 sg_key = evt_info_keys[0]
556 ei = store.retrieve(
'EventInfo', sg_key)
557 _info(
'=== [EventInfo#%s] ===', sg_key)
560 dh_keys = [k
for k
in store.keys(
'DataHeader')
562 if not k.startswith(
'[DB=')]
563 if len(dh_keys) != 1:
564 _error(
'more than 1 DataHeader key after filtering: %s', dh_keys)
565 _error(
'content of store: %s', store.keys(
'DataHeader'))
566 raise RuntimeError(
'more than one DataHeader key')
569 _info(
'=== [DataHeader#%s] ===', sg_key)
570 dh = store.retrieve(
'DataHeader', sg_key)
572 def _make_item_list(dhe):
574 clid = dhe.getPrimaryClassID()
575 _typename = store._pyclidsvc.typename
579 if hasattr(dh,
'at'):
581 for ii
in range(len(dh)):
582 dhes.append(dh.at(ii))
585 dhes =
list(dh.elements())
588 for i,dhe
in enumerate(dhes):
590 clid = dhe.getPrimaryClassID()
591 _typename = store._pyclidsvc.typename
596 except Exception
as err:
597 self.
msg.
info(
"no typename for clid [%s] (%s)", clid, err)
598 item_list.append((
str(clid), sgkey))
601 evt_type = ei.event_type()
602 det_descr_tags = _get_detdescr_tags(evt_type)
605 'run_number': [eid.run_number()],
606 'evt_number': [eid.event_number()],
608 'evt_type': evt_type.bit_mask,
609 'det_descr_tags': det_descr_tags,
610 'geometry': det_descr_tags.get(
'GeoAtlas',
None),
611 'conditions_tag': det_descr_tags.get(
'IOVDbGlobalTag',
None),
612 'lumi_block': [eid.lumi_block()],
613 'stream_names': [dh.getProcessTag()],
614 'eventdata_items': item_list,
615 'beam_type': [det_descr_tags.get(
'beam_type',
'N/A')],
616 'beam_energy': [maybe_float(det_descr_tags.get(
'beam_energy',
620 trigger_info= ei.trigger_info()
621 stream_tags = trigger_info.streamTags()
if trigger_info
else []
623 for st
in stream_tags:
626 obeys_lbk=
bool(st.obeysLumiblock())
627 stags.append(dict(stream_type=st_type,
629 obeys_lbk=obeys_lbk))
630 peeked_data[
'stream_tags'] = stags
636 metadata_items = [(self.
inputStore._pyclidsvc.typename(p.clID()),
639 peeked_data[
'metadata_items'] = metadata_items
643 metadata[k] = maybe_get(v, -1)
644 peeked_data[
'metadata'] = metadata
650 taginfo = mergeMultipleDict(v)
653 atlas_release = metadata.get(
'/TagInfo', taginfo)
654 atlas_release = atlas_release.get(
'AtlasRelease',
655 taginfo[
'AtlasRelease'])
656 taginfo[
'AtlasRelease'] = atlas_release
657 peeked_data[
'det_descr_tags'] = taginfo
658 peeked_data[
'tag_info'] = taginfo
662 ddt = peeked_data[
'det_descr_tags']
663 peeked_data[
'geometry'] = ddt.get(
'GeoAtlas',
None)
664 peeked_data[
'conditions_tag'] = ddt.get(
'IOVDbGlobalTag',
None)
667 peeked_data[
'beam_type']= [ddt.get(
'beam_type',
'N/A')]
668 beam_ene = maybe_float(ddt.get(
'beam_energy',
'N/A'))
669 peeked_data[
'beam_energy']=[beam_ene]
678 """a service to spy for file meta-data and store this collected data into
679 the pool file, in a python-pickle friendly format
684 super(FilePeekerSvc, self).
__init__(**kw)
689 svc = PyAthena.py_svc(
'IncidentSvc', iface=
'IIncidentSvc')
691 self.msg.
error(
'unable to get the incident svc')
692 return StatusCode.Failure
694 for incident
in (
'EndEvent',
696 svc.addListener(self, incident)
699 return StatusCode.Success
702 return StatusCode.Success
708 elif tp ==
'BeginInputFile':
709 self.msg.
info(
'input file name: [%s]', incident.fileName())