ATLAS Offline Software
Loading...
Searching...
No Matches
Event/PyDumper/python/PyComps.py
Go to the documentation of this file.
1# Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2
3# @file: PyDumper/python/PyComps.py
4# @purpose: A set of PyAthena components to test reading/writing EDM classes
5# @author: Sebastien Binet <binet@cern.ch>
6
7__doc__ = 'A set of PyAthena components to test reading/writing EDM classes'
8__version__ = '$Revision: 1.11 $'
9__author__ = 'Sebastien Binet <binet@cern.ch>'
10
11import os
12from fnmatch import fnmatch
13from io import IOBase
14import AthenaPython.PyAthena as PyAthena
15from AthenaPython.PyAthena import StatusCode
16import traceback
17
19 "helper function to generate a storegate key from a typename"
20 return 'pyathena_tests__%s__key' % t
21
22def _decode_item_list(lst, msg):
23 """helper function to decode a (python) item list of storegate keys"""
24 items = None
25 if (lst is None or lst == '*'):
26 items = None
27 else:
28 if isinstance(lst, str):
29 lst = lst.split(',')
30 if isinstance (lst, (list,set,tuple)):
31 if any (map (lambda x: not isinstance(x, str), lst)):
32 msg.error ('you have to provide a list of sg-keys !')
33 msg.error ('got: %s', lst)
34 raise ValueError()
35 elif len(lst)==1 and lst[0] is None or lst[0] == '*':
36 return None
37 items = []
38 for i in lst:
39 if isinstance(i, str) and i.count('#')==1:
40 item = i.split('#')
41 if item[1] == '*':
42 err = 'sorry, wildcards for keys are NOT (yet?) supported'
43 msg.error(err)
44 raise ValueError(err)
45 items.append(item)
46 elif isinstance(i, str) and i.count('#')==0:
47 # (type, key)
48 items.append ((None, i))
49 elif (isinstance(i, tuple) and len(i)==2 and
50 isinstance(i[0], str) and
51 isinstance(i[1], str)):
52 # (type, key)
53 items.append(i)
54 elif (isinstance(i, tuple) and len(i)==1 and
55 isinstance(i[0], str)):
56 # (type, key)
57 items.append ((None, i))
58 else:
59 err = 'invalid item format: %s'%i
60 msg.error(err)
61 raise ValueError(err)
62 return items
63
65 'algorithm to record a (dummy) container into storegate'
66 def __init__(self, name='PyWriter', **kw):
67
68 kw['name'] = name
69 super(PyWriter, self).__init__(**kw)
70
71
72 self.cont_type = kw.get('cont_type', '')
73 self.cont_name = kw.get('cont_name', '')
74
75
76 self.sg = None
77 return
78
79 def initialize(self):
80 self.msg.info('==> initialize...')
81 self.sg = PyAthena.py_svc('StoreGateSvc')
82 if not self.sg:
83 self.msg.error('could not retrieve StoreGateSvc')
84 return StatusCode.Failure
85
86 if self.cont_type is None or \
87 self.cont_type == '':
88 self.msg.error('you have to provide a container type to record !')
89 return StatusCode.Failure
90
91 if self.cont_name is None or \
92 self.cont_name == '':
94 return StatusCode.Success
95
96 def execute(self):
97 _info = self.msg.info
98 _info('==> execute...')
99 try:
100 k = self.cont_name
101 tp = self.cont_type
102 _info('creating a C++ container of type [%s]...', tp)
103 cont = getattr(PyAthena, tp)()
104 _info('size= %s', len(cont))
105 _info('recording in event store under [%s]...', k)
106 if not self.sg.record(cont, k) == StatusCode.Success:
107 self.msg.error('could not record container !')
108 return StatusCode.Failure
109 _info('all good')
110 except Exception as err:
111 self.msg.error('caught: %s',err)
112 return StatusCode.Success
113
114
115 def finalize(self):
116 self.msg.info('==> finalize...')
117 return StatusCode.Success
118
119 pass # class PyWriter
120
122 'algorithm to retrieve a (dummy) container from storegate'
123 def __init__(self, name='PyReader', **kw):
124
125 kw['name'] = name
126 super(PyReader, self).__init__(**kw)
127
128
129 self.cont_type = kw.get('cont_type', '')
130 self.cont_name = kw.get('cont_name', '')
131 self.ofile = kw.get('ofile', '')
132
133
134 self.sg = None
135 return
136
137 def initialize(self):
138 self.msg.info('==> initialize...')
139 self.sg = PyAthena.py_svc('StoreGateSvc')
140 if not self.sg:
141 self.msg.error('could not retrieve StoreGateSvc')
142 return StatusCode.Failure
143
144 if self.cont_type is None or \
145 self.cont_type == '':
146 self.msg.error('you have to provide a container type to retrieve !')
147 return StatusCode.Failure
148
149 if self.cont_name is None or \
150 self.cont_name == '':
152 pass
153
154 if self.ofile is None or self.ofile == '':
155 self.ofile = open('pyreader_%s.log'%self.cont_name, 'w')
156 pass
157 elif isinstance(self.ofile, str):
158 self.ofile = open(self.ofile, 'w')
159 elif isinstance(self.ofile, IOBase):
160 pass
161 else:
162 self.msg.error("don't know how to handle ofile value/type [%s/%s]!",
163 str(self.ofile), type(self.ofile))
164 return StatusCode.Failure
165
166 from PyDuper.Dumpers import get_dumper_fct
167 try:
168 cont_type = getattr(PyAthena, self.cont_type)
169 except AttributeError as err:
170 self.msg.error(err)
171 return StatusCode.Failure
172 self.dumper = get_dumper_fct(klass=cont_type, ofile=self.ofile)
173
174 if hasattr(self.ofile, 'name'):
175 ofile_name = self.ofile.name
176 else:
177 ofile_name = '<temporary unnamed file>'
178
179 self.msg.info ('dumping container [%s/%s]',
180 self.cont_type,
181 self.cont_name)
182 self.msg.info (' into [%s]', ofile_name)
183
184 return StatusCode.Success
185
186 def execute(self):
187 _info = self.msg.info
188 _info('==> execute...')
189 try:
190 k = self.cont_name
191 tp = self.cont_type
192 _info('retrieving a C++ container of type [%s] and key [%s]...',
193 tp, k)
194 cont = self.sg.retrieve(tp, k)
195 if not cont:
196 self.msg.error('could not retrieve container !')
197 return StatusCode.Failure
198 if hasattr (cont, '__len__'):
199 _info('size= %s, type=%s', len(cont),cont.__class__.__name__)
200 self.dumper(cont)
201 self.ofile.flush()
202 _info('all good')
203 except Exception as err:
204 self.msg.error('caught: %s',err)
205 return StatusCode.Success
206
207
208 def finalize(self):
209 self.msg.info('==> finalize...')
210 self.ofile.flush()
211 if hasattr(self.ofile,'name'):
212 o_name = self.ofile.name
213 else:
214 o_name = '<unnamed file>'
215 import sys
216 if not (self.ofile is sys.stdout) and \
217 not (self.ofile is sys.stderr) and \
218 not (o_name in ['/dev/stdout', '/dev/stderr',]):
219 self.ofile.close()
220 return StatusCode.Success
221
222 # class PyReader
223
225 'algorithm to retrieve and dump a list of keys from storegate'
226 _ignore_clids = (
227 1118613496, # old ParticleJetContainer class
228 222376821, # DataHeader
229 55596997, # EventContext
230 )
231 def __init__(self, name='PySgDumper', **kw):
232
233 kw['name'] = name
234 super(PySgDumper, self).__init__(**kw)
235
236
237 self.items = kw.get('items', '*')
238 self.exclude = kw.get('exclude', '')
239 self.ofile = kw.get('ofile', '')
240
241
242 self.sg = None
243
244
247 return
248
249 def initialize(self):
250 self.msg.info('==> initialize...')
251 self.sg = PyAthena.py_svc('StoreGateSvc')
252 if not self.sg:
253 self.msg.error('could not retrieve StoreGateSvc')
254 return StatusCode.Failure
255
256 self.clidsvc = PyAthena.py_svc ('ClassIDSvc')
257 if not self.clidsvc:
258 self.msg.error ('could not retrieve ClassIDSvc')
259 return StatusCode.Failure
260
261 try:
262 self.items = _decode_item_list(self.items, self.msg)
263 except Exception as err:
264 self.msg.error(err)
265 return StatusCode.Failure
266
267 if self.ofile is None or self.ofile == '':
268 from time import time
269 self.ofile = open('pysgdump_%s.log'%int(time()*100), 'w')
270 pass
271 elif isinstance(self.ofile, str):
272 self.ofile = open(self.ofile, 'w')
273 elif isinstance(self.ofile, IOBase):
274 pass
275 else:
276 self.msg.error("don't know how to handle ofile value/type [%s/%s]!",
277 str(self.ofile), type(self.ofile))
278 return StatusCode.Failure
279
280 from PyDumper.Dumpers import get_dumper_fct
281 self._dumper_fct = get_dumper_fct
282
283 if hasattr(self.ofile, 'name'):
284 ofile_name = self.ofile.name
285 else:
286 ofile_name = '<temporary unnamed file>'
287
288 self.msg.info ('dumping containers %s',
289 '<ALL>' if self.items is None else self.items)
290 self.msg.info ('into [%s]', ofile_name)
291
292 self._evt_nbr = 0
293 return StatusCode.Success
294
295 def execute(self):
296 _debug= self.msg.debug
297 _info = self.msg.info
298 _warn = self.msg.warning
299 _add_fail = self.failed_dumps.add
300 _info('==> processing event [%s]...', self._evt_nbr)
301 self.ofile.writelines (['%s\n'%('='*40),
302 '==> evt nbr [%s]\n' % self._evt_nbr])
303 self._evt_nbr += 1
304
305 if self.items is None:
306 elist = self.exclude.split(',')
307 proxies = self.sg.proxies()
308 sg = dict()
309 _typename = self.clidsvc.typename
310 for p in proxies:
311 clid = p.clID()
312 if clid in self._ignore_clids:
313 continue
314 n = str(p.name())
315 tp = _typename(clid)
316 if n == 'Input': continue
317 if n.endswith('Aux.'): continue
318 if n.endswith('_DELETED'): continue
319 if not p.isValid(): continue
320 excluded = False
321 for exc in elist:
322 if fnmatch (n, exc) or (tp and fnmatch(tp, exc)):
323 excluded = True
324 break
325 if excluded:
326 continue
327 if tp:
328 sg[n] = tp
329 else:
330 self.msg.warning ('no typename for key=[%s], clid=[%i]',
331 n, clid)
332 _add_fail ((n,clid,'no typename from clid'))
333
334 items = [(i[1], i[0]) for i in sg.items() if i[1] != "EventInfo"]
335 items.sort()
336 evt_info = [(clid, key) for key,clid in sg.items() if clid=="EventInfo"]
337 if len(evt_info)==1:
338 items.insert (0, evt_info[0])
339 else:
340 items = self.items
341
342 _retrieve = self.sg.retrieve
343 _get = self.sg.__getitem__
344 for cont_type, cont_key in items:
345 _debug ('dumping object [%s#%s]', cont_type, cont_key)
346 try:
347 if cont_type is None:
348 o = _get (cont_key)
349 else:
350 o = _retrieve (cont_type, cont_key)
351 if o is None:
352 _add_fail ((cont_key, cont_type, 'retrieve failed'))
353 if self._evt_nbr==1:
354 _warn ('could not retrieve object [%s#%s]',
355 cont_type, cont_key)
356 continue
357 try:
358 dumper = self._dumper_fct (klass=o.__class__,
359 ofile=self.ofile)
360 except RuntimeError as err:
361 _add_fail ((cont_key, cont_type, 'dump failed ' + str(err)))
362 if self._evt_nbr==1:
363 _warn (err)
364 continue
365 self.ofile.writelines (
366 ['%s%s[%s/%s]%s\n'%(
367 os.linesep,
368 '-'*10,
369 o.__class__.__name__,
370 cont_key,
371 '-'*10)
372 ]
373 )
374 self.ofile.flush()
375 dumper (o)
376 self.ofile.flush()
377 except Exception as err:
378 traceback.print_exc()
379 _add_fail ((cont_key, cont_type, 'sg-retrieve failed'))
380 if self._evt_nbr==1:
381 _warn ('caught exception:\n%s', err)
382 _warn ('could not retrieve object [%s#%s]',
383 cont_type, cont_key)
384 continue
385 return StatusCode.Success
386
387
388 def finalize(self):
389 self.msg.info ('==> finalize...')
390 self.msg.info ('processed [%s] events', self._evt_nbr)
391
392 self.ofile.flush()
393 if hasattr(self.ofile,'name'):
394 o_name = self.ofile.name
395 else:
396 o_name = '<unnamed file>'
397 import sys
398 if not (self.ofile is sys.stdout) and \
399 not (self.ofile is sys.stderr) and \
400 not (o_name in ['/dev/stdout', '/dev/stderr',]):
401 self.ofile.close()
402
403 _info = self.msg.info
404 if len(self.failed_dumps)>0:
405 _info ("the following objects could not be dumped:")
406 from collections import defaultdict
407 reasons = defaultdict(list)
408 for name,klass,reason in self.failed_dumps:
409 reasons[reason].append ((name,klass))
410 for reason in reasons.keys():
411 _info (' ==> [%s]', reason)
412 for name,klass in reasons[reason]:
413 _info (" [%s#%s]", klass, name)
414 return StatusCode.Success
415
416 # class PySgDumper
417
419 'algorithm to load a list of objects via their DataProxy from storegate'
420
421 def __init__(self, name='DataProxyLoader', **kw):
422
423 kw['name'] = name
424 super(DataProxyLoader, self).__init__(**kw)
425
426
427 self.items = kw.get('items', '*')
428
429
430 self.sg = None
431
432
435 return
436
437 def initialize(self):
438
439 self.msg.info('==> initialize...')
440 self.sg = PyAthena.py_svc('StoreGateSvc')
441 if not self.sg:
442 self.msg.error('could not retrieve StoreGateSvc')
443 return StatusCode.Failure
444
445 try:
446 self.items = _decode_item_list(self.items, self.msg)
447 except Exception as err:
448 self.msg.error(err)
449 return StatusCode.Failure
450
451 self.msg.info ('dumping containers %s',
452 '<ALL>' if self.items is None else self.items)
453 self._evt_nbr = 0
454 return StatusCode.Success
455
456 def execute(self):
457 _add_fail = self.failed_dumps.add
458 self.msg.info('==> processing event [%s]...', self._evt_nbr)
459 self._evt_nbr += 1
460 if self.items is None:
461 # take all from storegate
462 proxies = list(self.sg.proxies())
463 else:
464 proxies = []
465 all_proxies = list(self.sg.proxies())
466 for i in self.items:
467 i_clid = i[0]
468 i_name = i[1]
469 for p in all_proxies:
470 if (i_clid == p.clID() and
471 i_name == p.name()):
472 proxies.append(p)
473 continue
474 all_good = True
475 for p in proxies:
476 clid = p.clID()
477 sgkey= p.name()
478 self.msg.debug('loading proxy [%s#%s]...', clid, sgkey)
479 try:
480 dobj = p.accessData()
481 if not dobj:
482 all_good = False
483 except Exception as err:
484 self.msg.fatal('problem loading proxy [%s#%s]', clid, sgkey)
485 _add_fail((sgkey, clid, str(err)))
486 all_good = False
487
488 if all_good:
489 return StatusCode.Success
490 return StatusCode.Failure
491
492 def finalize(self):
493 _info = self.msg.info
494 _info ('==> finalize...')
495 _info ('processed [%s] events', self._evt_nbr)
496 if len(self.failed_dumps)>0:
497 _info ("the following objects could not be dumped:")
498 from collections import defaultdict
499 reasons = defaultdict(list)
500 for name,klass,reason in self.failed_dumps:
501 reasons[reason].append ((name,klass))
502 for reason in reasons.keys():
503 _info (' ==> [%s]', reason)
504 for name,klass in reasons[reason]:
505 _info (" [%s#%s]", klass, name)
506 return StatusCode.Success
507
const bool debug
MsgStream & msg() const
virtual StatusCode execute() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
failed_dumps
list of objects and/or classes for which the dump failed ( (key, clid-or-class, 'reason'),...
__init__(self, name='DataProxyLoader', **kw)
__init__(self, name='PyReader', **kw)
str cont_type
properties and data members
__init__(self, name='PySgDumper', **kw)
failed_dumps
list of objects and/or classes for which the dump failed ( (key, clid-or-class, 'reason'),...
__init__(self, name='PyWriter', **kw)
str cont_type
properties and data members
STL class.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177