7 from AthenaPython.PyAthena
import StatusCode
8 import AthenaPython.PyAthena
as PyAthena
11 """A simple python algorithm to create PayLoads
13 def __init__(self, name = "PySgStressProducer", **kw):
16 super(PySgStressProducer,self).
__init__(**kw)
18 if 'DataName' not in kw: self.
DataName =
"MyData"
24 self.
msg.
info(
"Initializing %s", self.name )
25 self.
sg = PyAthena.py_svc (
"StoreGateSvc")
27 self.
msg.error (
"could not retrieve event store")
28 return StatusCode.Failure
30 return StatusCode.Success
33 self.
msg.
debug(
"Executing %s...", self.name )
35 self.
msg.
error(
"Could not create PayLoad data !!" )
36 return StatusCode.Failure
37 return StatusCode.Success
40 _makePayLoadDv = PyAthena.SgTests.PayLoadDv
41 _makePayLoad = PyAthena.SgTests.PayLoad
42 _sg_record = self.
sg.record
43 _sg_setConst = self.
sg.setConst
47 outName =
"%s_payload_%i" % (self.
DataName, i)
52 _sg_record(dv, outName)
55 self.
msg.
error(
"Could not store data at [%s] !!", outName )
57 if allGood
and not _sg_setConst(dv).isSuccess():
58 self.
msg.warning(
"Could not setConst data at [%s] !!", outName)
63 pback = data.push_back
66 if allGood:
return StatusCode.Success
67 return StatusCode.Failure
70 self.
msg.
info(
"Finalizing %s...", self.name )
71 return StatusCode.Success
77 """A simple python algorithm to retrieve PayLoads
79 def __init__(self, name = "PySgStressProducer", **kw):
82 super(PySgStressConsumer,self).
__init__(**kw)
84 if 'DataName' not in kw: self.
DataName =
"MyData"
88 self.
msg.
info(
"Initializing %s...", self.name )
89 self.
sg = PyAthena.py_svc(
"StoreGateSvc")
91 self.
msg.error (
"could not retrieve event store")
92 return StatusCode.Failure
93 return StatusCode.Success
96 self.
msg.
debug(
"Executing %s...", self.name )
102 outName =
"%s_payload_%i" % (self.
DataName, i)
103 dv = self.
sg.
retrieve(
"SgTests::PayLoadDv", outName )
105 self.
msg.
error(
"Could not retrieve payload !!" )
108 data = dv.at(0).m_data
110 self.
msg.
error(
"**NOT** my data!!" )
112 if allGood:
return StatusCode.Success
113 return StatusCode.Failure
116 self.
msg.
info(
"Finalizing %s...", self.name )
117 return StatusCode.Success
123 """A simple algorithm to put simple objects (std::vector<T>, builtins)
124 into StoreGate and see what happens
127 def __init__(self, name='PyClidsTestWriter', **kw):
129 super(PyClidsTestWriter,self).
__init__(**kw)
138 self.
msg.
info(
"initializing...")
139 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
141 self.
msg.
error(
"Could not retrieve StoreGateSvc !")
142 return StatusCode.Failure
144 _info = self.
msg.info
145 _info(
"Configuration:")
146 _info(
" - ints: [%s]", self.
intsName)
153 'std::vector<unsigned int>' : self.
uintsName,
157 return StatusCode.Success
161 _error = self.
msg.error
162 self.
msg.
info(
"running execute...")
165 tp = getattr(PyAthena, tpName)
168 for i
in range(100): cont.push_back(i)
170 self.
sg[sgKey] = cont
171 except Exception
as err:
172 _error(
"Could not record '%s' at [%s] !",tpName,sgKey)
176 if not allGood:
return StatusCode.Failure
186 _info = self.
msg.info
189 if not cont: _info(
'Could not retrieve [%s] !',sgKey)
190 cont = [cont[i]
for i
in range(10)]
191 _info(
'[%s] content: %s', sgKey,cont)
192 if len( [i
for i
in range(10)
if i != cont[i]] ) > 0:
193 self.
msg.
error(
'[%s] content is NOT as expected !!')
195 if not allGood:
return StatusCode.Failure
196 return StatusCode.Success
200 return StatusCode.Success