7from AthenaPython.PyAthena
import StatusCode
8import AthenaPython.PyAthena
as PyAthena
12 """A simple python algorithm to create PayLoads
14 def __init__(self, name = "PySgStressProducer", **kw):
17 super(PySgStressProducer,self).
__init__(**kw)
19 if 'DataName' not in kw: self.
DataName =
"MyData"
25 self.
msg.info(
"Initializing %s", self.
name )
26 self.
sg = PyAthena.py_svc (
"StoreGateSvc")
28 self.
msg.error (
"could not retrieve event store")
29 return StatusCode.Failure
31 return StatusCode.Success
36 self.
msg.
error(
"Could not create PayLoad data !!" )
37 return StatusCode.Failure
38 return StatusCode.Success
41 _makePayLoadDv = PyAthena.SgTests.PayLoadDv
42 _makePayLoad = PyAthena.SgTests.PayLoad
43 _sg_record = self.
sg.record
44 _sg_setConst = self.
sg.setConst
48 outName =
"%s_payload_%i" % (self.
DataName, i)
52 ROOT.SetOwnership (data,
False)
54 _sg_record(dv, outName)
57 self.
msg.
error(
"Could not store data at [%s] !!", outName )
59 if allGood
and not _sg_setConst(dv).isSuccess():
60 self.
msg.warning(
"Could not setConst data at [%s] !!", outName)
65 pback = data.push_back
68 if allGood:
return StatusCode.Success
69 return StatusCode.Failure
72 self.
msg.info(
"Finalizing %s...", self.
name )
73 return StatusCode.Success
79 """A simple python algorithm to retrieve PayLoads
81 def __init__(self, name = "PySgStressProducer", **kw):
84 super(PySgStressConsumer,self).
__init__(**kw)
86 if 'DataName' not in kw: self.
DataName =
"MyData"
90 self.
msg.info(
"Initializing %s...", self.
name )
91 self.
sg = PyAthena.py_svc(
"StoreGateSvc")
93 self.
msg.error (
"could not retrieve event store")
94 return StatusCode.Failure
95 return StatusCode.Success
104 outName =
"%s_payload_%i" % (self.
DataName, i)
105 dv = self.
sg.retrieve(
"SgTests::PayLoadDv", outName )
107 self.
msg.
error(
"Could not retrieve payload !!" )
110 data = dv.at(0).m_data
112 self.
msg.
error(
"**NOT** my data!!" )
114 if allGood:
return StatusCode.Success
115 return StatusCode.Failure
118 self.
msg.info(
"Finalizing %s...", self.
name )
119 return StatusCode.Success
125 """A simple algorithm to put simple objects (std::vector<T>, builtins)
126 into StoreGate and see what happens
129 def __init__(self, name='PyClidsTestWriter', **kw):
131 super(PyClidsTestWriter,self).
__init__(**kw)
140 self.
msg.info(
"initializing...")
141 self.
sg = PyAthena.py_svc(
'StoreGateSvc')
143 self.
msg.
error(
"Could not retrieve StoreGateSvc !")
144 return StatusCode.Failure
146 _info = self.
msg.info
147 _info(
"Configuration:")
148 _info(
" - ints: [%s]", self.
intsName)
155 'std::vector<unsigned int>' : self.
uintsName,
159 return StatusCode.Success
163 _error = self.
msg.error
164 self.
msg.info(
"running execute...")
167 tp = getattr(PyAthena, tpName)
170 for i
in range(100): cont.push_back(i)
172 self.
sg[sgKey] = cont
173 except Exception
as err:
174 _error(
"Could not record '%s' at [%s] !",tpName,sgKey)
178 if not allGood:
return StatusCode.Failure
188 _info = self.
msg.info
190 cont = self.
sg.retrieve(tpName,sgKey)
191 if not cont: _info(
'Could not retrieve [%s] !',sgKey)
192 cont = [cont[i]
for i
in range(10)]
193 _info(
'[%s] content: %s', sgKey,cont)
194 if len( [i
for i
in range(10)
if i != cont[i]] ) > 0:
195 self.
msg.
error(
'[%s] content is NOT as expected !!')
197 if not allGood:
return StatusCode.Failure
198 return StatusCode.Success
201 self.
msg.info(
"finalize...")
202 return StatusCode.Success
virtual StatusCode finalize() override
virtual StatusCode initialize() override
__init__(self, name='PyClidsTestWriter', **kw)
__init__(self, name="PySgStressProducer", **kw)
__init__(self, name="PySgStressProducer", **kw)