7 __doc__ =
'A set of PyAthena components to test reading/writing EDM classes'
8 __version__ =
'$Revision: 1.11 $'
9 __author__ =
'Sebastien Binet <binet@cern.ch>'
12 from fnmatch
import fnmatch
14 import AthenaPython.PyAthena
as PyAthena
15 from AthenaPython.PyAthena
import StatusCode
19 "helper function to generate a storegate key from a typename"
20 return 'pyathena_tests__%s__key' % t
23 """helper function to decode a (python) item list of storegate keys"""
25 if (lst
is None or lst ==
'*'):
28 if isinstance(lst, str):
30 if isinstance (lst, (list,set,tuple)):
31 if any (map (
lambda x:
not isinstance(x, str), lst)):
32 msg.error (
'you have to provide a list of sg-keys !')
33 msg.error (
'got: %s', lst)
35 elif len(lst)==1
and lst[0]
is None or lst[0] ==
'*':
39 if isinstance(i, str)
and i.count(
'#')==1:
42 err =
'sorry, wildcards for keys are NOT (yet?) supported'
46 elif isinstance(i, str)
and i.count(
'#')==0:
48 items.append ((
None, i))
49 elif (isinstance(i, tuple)
and len(i)==2
and
50 isinstance(i[0], str)
and
51 isinstance(i[1], str)):
54 elif (isinstance(i, tuple)
and len(i)==1
and
55 isinstance(i[0], str)):
57 items.append ((
None, i))
59 err =
'invalid item format: %s'%i
65 'algorithm to record a (dummy) container into storegate'
80 self.
msg.
info(
'==> initialize...')
81 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
83 self.
msg.
error(
'could not retrieve StoreGateSvc')
84 return StatusCode.Failure
88 self.
msg.
error(
'you have to provide a container type to record !')
89 return StatusCode.Failure
94 return StatusCode.Success
98 _info(
'==> execute...')
102 _info(
'creating a C++ container of type [%s]...', tp)
103 cont = getattr(PyAthena, tp)()
104 _info(
'size= %s', len(cont))
105 _info(
'recording in event store under [%s]...', k)
106 if not self.
sg.record(cont, k) == StatusCode.Success:
107 self.
msg.
error(
'could not record container !')
108 return StatusCode.Failure
110 except Exception
as err:
112 return StatusCode.Success
116 self.
msg.
info(
'==> finalize...')
117 return StatusCode.Success
122 'algorithm to retrieve a (dummy) container from storegate'
126 super(PyReader, self).
__init__(**kw)
138 self.
msg.
info(
'==> initialize...')
139 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
141 self.
msg.
error(
'could not retrieve StoreGateSvc')
142 return StatusCode.Failure
146 self.
msg.
error(
'you have to provide a container type to retrieve !')
147 return StatusCode.Failure
157 elif isinstance(self.
ofile, str):
159 elif isinstance(self.
ofile, IOBase):
162 self.
msg.
error(
"don't know how to handle ofile value/type [%s/%s]!",
164 return StatusCode.Failure
166 from PyDuper.Dumpers
import get_dumper_fct
168 cont_type = getattr(PyAthena, self.
cont_type)
169 except AttributeError
as err:
171 return StatusCode.Failure
174 if hasattr(self.
ofile,
'name'):
175 ofile_name = self.
ofile.name
177 ofile_name =
'<temporary unnamed file>'
179 self.
msg.info (
'dumping container [%s/%s]',
182 self.
msg.info (
' into [%s]', ofile_name)
184 return StatusCode.Success
187 _info = self.
msg.info
188 _info(
'==> execute...')
192 _info(
'retrieving a C++ container of type [%s] and key [%s]...',
196 self.
msg.
error(
'could not retrieve container !')
197 return StatusCode.Failure
198 if hasattr (cont,
'__len__'):
199 _info(
'size= %s, type=%s', len(cont),cont.__class__.__name__)
203 except Exception
as err:
205 return StatusCode.Success
209 self.
msg.
info(
'==> finalize...')
211 if hasattr(self.
ofile,
'name'):
212 o_name = self.
ofile.name
214 o_name =
'<unnamed file>'
216 if not (self.
ofile is sys.stdout)
and \
217 not (self.
ofile is sys.stderr)
and \
218 not (o_name
in [
'/dev/stdout',
'/dev/stderr',]):
220 return StatusCode.Success
225 'algorithm to retrieve and dump a list of keys from storegate'
234 super(PySgDumper, self).
__init__(**kw)
250 self.
msg.
info(
'==> initialize...')
251 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
253 self.
msg.
error(
'could not retrieve StoreGateSvc')
254 return StatusCode.Failure
258 self.
msg.error (
'could not retrieve ClassIDSvc')
259 return StatusCode.Failure
263 except Exception
as err:
265 return StatusCode.Failure
268 from time
import time
269 self.
ofile =
open(
'pysgdump_%s.log'%
int(time()*100),
'w')
271 elif isinstance(self.
ofile, str):
273 elif isinstance(self.
ofile, IOBase):
276 self.
msg.
error(
"don't know how to handle ofile value/type [%s/%s]!",
278 return StatusCode.Failure
280 from PyDumper.Dumpers
import get_dumper_fct
283 if hasattr(self.
ofile,
'name'):
284 ofile_name = self.
ofile.name
286 ofile_name =
'<temporary unnamed file>'
288 self.
msg.info (
'dumping containers %s',
289 '<ALL>' if self.
items is None else self.
items)
290 self.
msg.info (
'into [%s]', ofile_name)
293 return StatusCode.Success
296 _debug= self.
msg.debug
297 _info = self.
msg.info
298 _warn = self.
msg.warning
300 _info(
'==> processing event [%s]...', self.
_evt_nbr)
301 self.
ofile.writelines ([
'%s\n'%(
'='*40),
302 '==> evt nbr [%s]\n' % self.
_evt_nbr])
305 if self.
items is None:
307 proxies = self.
sg.proxies()
309 _typename = self.
clidsvc.typename
316 if n ==
'Input':
continue
317 if n.endswith(
'Aux.'):
continue
318 if n.endswith(
'_DELETED'):
continue
319 if not p.isValid():
continue
322 if fnmatch (n, exc)
or (tp
and fnmatch(tp, exc)):
330 self.
msg.warning (
'no typename for key=[%s], clid=[%i]',
332 _add_fail ((n,clid,
'no typename from clid'))
334 items = [(i[1], i[0])
for i
in sg.items()
if i[1] !=
"EventInfo"]
336 evt_info = [(clid, key)
for key,clid
in sg.items()
if clid==
"EventInfo"]
338 items.insert (0, evt_info[0])
342 _retrieve = self.
sg.retrieve
343 _get = self.
sg.__getitem__
344 for cont_type, cont_key
in items:
345 _debug (
'dumping object [%s#%s]', cont_type, cont_key)
347 if cont_type
is None:
350 o = _retrieve (cont_type, cont_key)
352 _add_fail ((cont_key, cont_type,
'retrieve failed'))
354 _warn (
'could not retrieve object [%s#%s]',
360 except RuntimeError
as err:
361 _add_fail ((cont_key, cont_type,
'dump failed ' +
str(err)))
365 self.
ofile.writelines (
369 o.__class__.__name__,
377 except Exception
as err:
378 traceback.print_exc()
379 _add_fail ((cont_key, cont_type,
'sg-retrieve failed'))
381 _warn (
'caught exception:\n%s', err)
382 _warn (
'could not retrieve object [%s#%s]',
385 return StatusCode.Success
389 self.
msg.info (
'==> finalize...')
390 self.
msg.info (
'processed [%s] events', self.
_evt_nbr)
393 if hasattr(self.
ofile,
'name'):
394 o_name = self.
ofile.name
396 o_name =
'<unnamed file>'
398 if not (self.
ofile is sys.stdout)
and \
399 not (self.
ofile is sys.stderr)
and \
400 not (o_name
in [
'/dev/stdout',
'/dev/stderr',]):
403 _info = self.
msg.info
405 _info (
"the following objects could not be dumped:")
406 from collections
import defaultdict
407 reasons = defaultdict(list)
409 reasons[reason].append ((name,klass))
410 for reason
in reasons.keys():
411 _info (
' ==> [%s]', reason)
412 for name,klass
in reasons[reason]:
413 _info (
" [%s#%s]", klass, name)
414 return StatusCode.Success
419 'algorithm to load a list of objects via their DataProxy from storegate'
424 super(DataProxyLoader, self).
__init__(**kw)
439 self.
msg.
info(
'==> initialize...')
440 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
442 self.
msg.
error(
'could not retrieve StoreGateSvc')
443 return StatusCode.Failure
447 except Exception
as err:
449 return StatusCode.Failure
451 self.
msg.info (
'dumping containers %s',
452 '<ALL>' if self.
items is None else self.
items)
454 return StatusCode.Success
460 if self.
items is None:
462 proxies =
list(self.
sg.proxies())
465 all_proxies =
list(self.
sg.proxies())
469 for p
in all_proxies:
470 if (i_clid == p.clID()
and
478 self.
msg.
debug(
'loading proxy [%s#%s]...', clid, sgkey)
480 dobj = p.accessData()
483 except Exception
as err:
484 self.
msg.fatal(
'problem loading proxy [%s#%s]', clid, sgkey)
485 _add_fail((sgkey, clid,
str(err)))
489 return StatusCode.Success
490 return StatusCode.Failure
493 _info = self.
msg.info
494 _info (
'==> finalize...')
495 _info (
'processed [%s] events', self.
_evt_nbr)
497 _info (
"the following objects could not be dumped:")
498 from collections
import defaultdict
499 reasons = defaultdict(list)
501 reasons[reason].append ((name,klass))
502 for reason
in reasons.keys():
503 _info (
' ==> [%s]', reason)
504 for name,klass
in reasons[reason]:
505 _info (
" [%s#%s]", klass, name)
506 return StatusCode.Success