ATLAS Offline Software
LArBlobMergeAlg.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
9 
10 __author__="Pavol Strizenec <pavol@cern.ch>, based on example from Walter Lampl"
11 __doc__ = " An athena algorithm to merge two LAr blobs"
12 
13 from AthenaPython.PyAthena import StatusCode
14 import AthenaPython.PyAthena as PyAthena
15 
16 import ROOT
17 
18 from ROOT import Identifier, IdentifierHash
19 
20 from PyCool import cool
21 
22 from array import array
23 import struct
24 import cppyy
25 
26 def lenFactor(thelen):
27  ''' Get the length of rows in the db, assuming that the number of entries will be divisible by some number of cells / SCs / other DB objects '''
28  factor_cells = thelen/ 195072.
29  factor_SC = thelen/34048.
30  factor_LATOME = thelen/116.
31  factor = None
32  if factor_cells.is_integer():
33  factor = int(factor_cells)
34  if factor_SC.is_integer():
35  factor = int(factor_SC)
36  elif factor_LATOME.is_integer():
37  factor = int(factor_LATOME)
38  return factor
39 
40 def payload_dict(payload, verbose=True):
41  contents = {}
42  for k in payload.keys():
43  contents[k] = {}
44  if isinstance(payload[k], cppyy.gbl.coral.Blob):
45  factor = lenFactor(len(payload[k])/4)
46  if verbose: print(f"{k} consists of {factor} item(s) per row")
47  contents[k]["payload"] = payload[k]
48  contents[k]["size"] = factor
49  else:
50  contents[k]["payload"] = payload[k]
51  contents[k]["size"] = None
52  return contents
53 
54 def patchChannel(idhelper, channel, patchdet, patchFT, patchChan, verbose=False ):
55  chid = idhelper.channel_Id(IdentifierHash(channel))
56  chidval = chid.get_identifier32().get_compact()
57  name = idhelper.channel_name(chid)
58  FT = idhelper.feedthrough_name(chid)
59 
60  if len(patchdet) > 0:
61  if 0 in patchdet and idhelper.isEMBchannel(chid):
62  if verbose: print(name, chidval, "is in EMB")
63  return True
64  if 1 in patchdet and idhelper.isEMECOW(chid):
65  if verbose: print(name, chidval, "is in EMECOW")
66  return True
67  if 2 in patchdet and idhelper.isEMECIW(chid):
68  if verbose: print(name, chidval, "is in EMECIW")
69  return True
70  if 3 in patchdet and idhelper.isHECchannel(chid):
71  if verbose: print(name, chidval, "is in HEC")
72  return True
73  if 4 in patchdet and idhelper.isFCALchannel(chid):
74  if verbose: print(name, chidval, "is in FCAL")
75  return True
76  if len(patchFT) > 0:
77  if FT in patchFT:
78  if verbose: print(name, chidval, "is in FT", FT)
79  return True
80  if len(patchChan) > 0:
81  if chidval in patchChan:
82  if verbose: print(name, chidval, "is in list of channels")
83  return True
84 
85  return False
86 
87 
88 
90 
91  def __init__(self, name="LArBlobMergeAlg", **kw):
92 
93  kw['name'] = name
94  super(LArBlobMergeAlg,self).__init__(**kw)
95 
96  self.indb=kw.get('inputdb',"")
97  self.patchdb=kw.get('patchingdb',"")
98  self.outdb=kw.get('outputdb',"")
99  self.folder=kw.get('fld',"")
100  self.run=kw.get('runnum',999999)
101  self.issc=kw.get('isSC',False)
102  self.patchdet=kw.get('patchdet',[])
103  self.patchFT=kw.get('patchFT',[])
104  self.patchChan=kw.get('patchChan',[])
105  self.nEvts=0
106 
107  self.msg.info(f"indb: {self.indb}, patchdb:{self.patchdb}, patchdet: {self.patchdet}, patchFT: {self.patchFT}, patchChan: {self.patchChan}, outdb: {self.outdb}, folder: {self.folder}, run: {self.run}, issc: {self.issc}")
108  return
109 
110 
111 
112  def initialize(self):
113 
116 
117  # Get DetectorStore...
118 
119  self._detStore = PyAthena.py_svc('StoreGateSvc/DetectorStore')
120  if self._detStore is None:
121  self.msg.error("Failed to get DetectorStore")
122  return StatusCode.Failure
123 
124  self._condStore = PyAthena.py_svc('StoreGateSvc/ConditionStore')
125  if (self._condStore is None):
126  self.msg.error("Failed to get ConditionStore")
127  return StatusCode.Failure
128 
129  # Get LArOnlineID helper class
130  if self.issc:
131  self.onlineID=self._detStore.retrieve("LArOnline_SuperCellID","LArOnline_SuperCellID")
132  else:
133  self.onlineID=self._detStore.retrieve("LArOnlineID","LArOnlineID")
134  if self.onlineID is None:
135  self.msg.error("Failed to get LArOnlineID")
136  return StatusCode.Failure
137 
138 
140 
141 
142  # this could be also be a paramter from outside:
143  self.iovSince = 0
144  self.iovUntil = cool.ValidityKeyMax
145 
146  return StatusCode.Success
147 
148 
149  def execute(self):
150  eid=ROOT.Gaudi.Hive.currentContext().eventID()
151 
152  try:
153  if self.issc:
154  condCont=self._condStore.retrieve("CondCont<LArOnOffIdMapping>","LArOnOffIdMapSC")
155  else:
156  condCont=self._condStore.retrieve("CondCont<LArOnOffIdMapping>","LArOnOffIdMap")
157  self.larCabling=condCont.find(eid)
158  except Exception:
159  print("ERROR, failed to get LArCabling")
160  return StatusCode.Failure
161 
162  if self.nEvts==0:
163  self.nEvts+=1
164  #Process one 'dummy' event to make sure all DB connections get closed
165  #print ("Dummy event...")
166  return StatusCode.Success
167 
168  self.onlineID.set_do_checks(True)
169  #self.offlineID.set_do_checks(True)
170 
171  return StatusCode.Success
172 
173 
174  def finalize(self):
175  self.msg.info('finalizing...')
176  return StatusCode.Success
177 
178  def stop(self): # here is all the code for the merging
179 
180  from CoolConvUtilities.AtlCoolLib import indirectOpen
181 
182  # reading main input blob
183  inconn = indirectOpen(self.indb, True)
184  if (inconn is None):
185  self.msg.error(f"Cannot connect to database {self.indb}")
186  raise RuntimeError(f"ERROR: Cannot connect to database {self.indb}")
187 
188  contents = {}
189  contents2 = {}
190  try:
191  folder=inconn.getFolder(self.folder)
192  runiov=self.run << 32
193  obj=folder.findObject(runiov,0)
194  payload=obj.payload()
195 
196  contents.update(payload_dict(payload))
197 
198  except Exception as e:
199  self.msg.warning(f"Could not decode {self.folder} from {self.indb}\n{e}")
200  return StatusCode.Failure
201 
202  # reading second input blob
203  if self.indb == self.patchdb:
204  inconn2=inconn
205  else:
206  inconn2 = indirectOpen(self.patchdb, True)
207  if (inconn2 is None):
208  self.msg.error(f"Cannot connect to database {self.patchdb}")
209  raise RuntimeError(f"ERROR: Cannot connect to database {self.patchdb}")
210 
211  try:
212  folder=inconn2.getFolder(self.folder)
213  runiov=self.run << 32
214  obj=folder.findObject(runiov,0)
215  payload=obj.payload()
216  contents2.update(payload_dict(payload))
217  except Exception:
218  self.msg.warning(f"Could not decode {self.folder} from {self.patchdb}")
219  return StatusCode.Failure
220 
221  # create folder
222  dbSvc = cool.DatabaseSvcFactory.databaseService()
223  try:
224  outconn = dbSvc.openDatabase(self.outdb,False)
225  outfolder = outconn.getFolder(self.folder)
226  except Exception:
227  #create one
228  outconn = dbSvc.createDatabase(self.outdb)
229  folder = inconn.getFolder(self.folder)
230  fspec = folder.folderSpecification()
231  from CaloCondBlobAlgs import CaloCondTools
232  desc = CaloCondTools.getAthenaFolderDescr()
233  outfolder = outconn.createFolder(self.folder, fspec, desc, True)
234 
235  # create new blobs
236  spec = cool.RecordSpecification()
237 
238  for k in contents.keys():
239  if isinstance(contents[k]["payload"], cppyy.gbl.coral.Blob):
240  spec.extend( k, cool.StorageType.Blob16M )
241  elif isinstance(contents[k]["payload"], int):
242  spec.extend( k, cool.StorageType.UInt32 )
243  else:
244  self.msg.error(f"CANNOT WORK OUT THE DATATYPE OF {k}: {contents[k]} FOR STORAGE")
245  return StatusCode.Failure
246 
247  data = cool.Record( spec )
248 
249  hash_max = self.onlineID.channelHashMax()
250  fsize = 4
251 
252  btype=getattr(ROOT,"coral::Blob")
253  outblob = {}
254  vVec = {}
255  blobvals = [ k for k in contents.keys() if isinstance( contents[k]["payload"], cppyy.gbl.coral.Blob) ]
256  blobvals2 = [ k for k in contents2.keys() if isinstance( contents2[k]["payload"], cppyy.gbl.coral.Blob) ]
257 
258  if len(blobvals) != len(blobvals2):
259  self.msg.error("The same blob payloads are not present in the two dbs!\n(1) {blobvals}\n(2) {blobvals2}")
260  return StatusCode.Failure
261 
262  for bv in blobvals:
263  vecLength = contents[bv]["size"]
264  blobsize = vecLength*hash_max*fsize
265  outblob[bv] = btype()
266  outblob[bv].resize(blobsize)
267 
268  if blobsize != len(contents[bv]["payload"]):
269  self.msg.error(f'Wrong size {len(contents[bv]["payload"])} of the input blob, should be {blobsize}')
270  return StatusCode.Failure
271 
272  vVec[bv] = ROOT.std.vector('float')(vecLength*hash_max)
273 
274  blobdata = contents[bv]["payload"].read()
275 
276  for i in range(0,hash_max):
277  if vecLength > 1:
278  for iv in range(0, vecLength):
279  vVec[bv][i*vecLength + iv] = float(struct.unpack('f',blobdata[(i*vecLength + iv)*fsize:((i*vecLength+iv)*fsize+fsize)])[0])
280  else:
281  vVec[bv][i] = float(struct.unpack('f',blobdata[i*fsize:(i*fsize+fsize)])[0])
282 
283  if blobsize != len(contents2[bv]["payload"]):
284  self.msg.error(f'Wrong size {len(contents2[bv]["payload"])} of the input blob, should be {blobsize}')
285  return StatusCode.Failure
286 
287  # patch what is needed from the second blob
288  blobdata2 = contents2[bv]["payload"].read()
289  vecLength2 = contents2[bv]["size"]
290  if vecLength != vecLength2:
291  self.msg.error(f"Item {bv} has a different shape in the base and patch dbs! {vecLength} & {vecLength2}")
292  return StatusCode.Failure
293 
294 
295  for i in range(0,hash_max):
296  doPatch = patchChannel(self.onlineID, i, self.patchdet, self.patchFT, self.patchChan)
297  if doPatch:
298  if vecLength > 1:
299  for iv in range(0, vecLength):
300  vVec[bv][i*vecLength + iv] = float(struct.unpack('f',blobdata2[(i*vecLength + iv)*fsize:((i*vecLength+iv)*fsize+fsize)])[0])
301  else:
302  vVec[bv][i] = float(struct.unpack('f', blobdata2[i*fsize:(i*fsize+fsize)])[0])
303 
304  #now trick with buffer
305  writable_buf = { k: array('f') for k in vVec.keys() }
306  buff = {}
307  for key in vVec.keys():
308  for i in range(0,vVec[key].size()):
309  writable_buf[key].append(vVec[key][i])
310  #and use buffers to fill blobs
311  buff[key] = writable_buf[key].tobytes()
312 
313  for j in range(0, len(buff[key])):
314  outblob[key][j] = buff[key][j]
315 
316  data[key] = outblob[key]
317 
318  tofill = [ k for k in contents.keys() if k not in outblob.keys() ]
319 
320  for tf in tofill:
321  if contents[tf]["payload"] != contents2[tf]["payload"]:
322  self.msg.warning(f'Setting {tf} to {contents[tf]["payload"]}, but beware: the value is {contents2[tf]["payload"]} in the patching db')
323  data[tf] = contents[tf]["payload"]
324 
325  #and record data
326  outfolder.storeObject(self.iovSince, self.iovUntil, data, cool.ChannelId(0))
327 
328  #=== close the database
329  outconn.closeDatabase()
330 
331  return StatusCode.Success
332 
333 
334 
335 
336 
grepfile.info
info
Definition: grepfile.py:38
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
python.LArBlobMergeAlg.LArBlobMergeAlg
Definition: LArBlobMergeAlg.py:89
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.LArBlobMergeAlg.LArBlobMergeAlg._detStore
_detStore
note that we are using the python logging service and that the PyAthena.Alg base class has already in...
Definition: LArBlobMergeAlg.py:119
python.LArBlobMergeAlg.LArBlobMergeAlg.larCabling
larCabling
Definition: LArBlobMergeAlg.py:157
PyAthena::Alg::initialize
virtual StatusCode initialize() override
Definition: PyAthenaAlg.cxx:60
PyAthena::Alg::execute
virtual StatusCode execute() override
Definition: PyAthenaAlg.cxx:93
python.LArBlobMergeAlg.LArBlobMergeAlg.outdb
outdb
Definition: LArBlobMergeAlg.py:98
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.LArBlobMergeAlg.LArBlobMergeAlg.iovSince
iovSince
Definition: LArBlobMergeAlg.py:143
python.LArBlobMergeAlg.payload_dict
def payload_dict(payload, verbose=True)
Definition: LArBlobMergeAlg.py:40
PyAthena::Alg::stop
virtual StatusCode stop() override
Definition: PyAthenaAlg.cxx:80
python.LArBlobMergeAlg.LArBlobMergeAlg.folder
folder
Definition: LArBlobMergeAlg.py:99
PyAthena::Alg::finalize
virtual StatusCode finalize() override
Definition: PyAthenaAlg.cxx:86
python.LArBlobMergeAlg.patchChannel
def patchChannel(idhelper, channel, patchdet, patchFT, patchChan, verbose=False)
Definition: LArBlobMergeAlg.py:54
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
python.LArBlobMergeAlg.LArBlobMergeAlg.noid
noid
Definition: LArBlobMergeAlg.py:139
python.LArBlobMergeAlg.LArBlobMergeAlg.patchFT
patchFT
Definition: LArBlobMergeAlg.py:103
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.LArBlobMergeAlg.LArBlobMergeAlg.__init__
def __init__(self, name="LArBlobMergeAlg", **kw)
Definition: LArBlobMergeAlg.py:91
python.LArBlobMergeAlg.LArBlobMergeAlg.patchdb
patchdb
Definition: LArBlobMergeAlg.py:97
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
array
python.LArBlobMergeAlg.lenFactor
def lenFactor(thelen)
Definition: LArBlobMergeAlg.py:26
python.LArBlobMergeAlg.LArBlobMergeAlg.patchdet
patchdet
Definition: LArBlobMergeAlg.py:102
python.LArBlobMergeAlg.LArBlobMergeAlg.run
run
Definition: LArBlobMergeAlg.py:100
python.LArBlobMergeAlg.LArBlobMergeAlg.issc
issc
Definition: LArBlobMergeAlg.py:101
python.LArBlobMergeAlg.LArBlobMergeAlg.nEvts
nEvts
Definition: LArBlobMergeAlg.py:105
AthCommonMsg< Algorithm >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
python.LArBlobMergeAlg.LArBlobMergeAlg._condStore
_condStore
Definition: LArBlobMergeAlg.py:124
PyAthena::Alg
Definition: PyAthenaAlg.h:33
python.LArBlobMergeAlg.LArBlobMergeAlg.iovUntil
iovUntil
Definition: LArBlobMergeAlg.py:144
python.LArBlobMergeAlg.LArBlobMergeAlg.indb
indb
init the base class
Definition: LArBlobMergeAlg.py:96
error
Definition: IImpactPoint3dEstimator.h:70
python.AtlCoolLib.indirectOpen
def indirectOpen(coolstr, readOnly=True, debug=False)
Definition: AtlCoolLib.py:130
readCCLHist.float
float
Definition: readCCLHist.py:83
python.LArBlobMergeAlg.LArBlobMergeAlg.onlineID
onlineID
Definition: LArBlobMergeAlg.py:131
python.LArBlobMergeAlg.LArBlobMergeAlg.patchChan
patchChan
Definition: LArBlobMergeAlg.py:104
Identifier
Definition: IdentifierFieldParser.cxx:14