ATLAS Offline Software
Loading...
Searching...
No Matches
LArBlobMergeAlg.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3
9
10__author__="Pavol Strizenec <pavol@cern.ch>, based on example from Walter Lampl"
11__doc__ = " An athena algorithm to merge two LAr blobs"
12
13from AthenaPython.PyAthena import StatusCode
14import AthenaPython.PyAthena as PyAthena
15
16import ROOT
17
18from ROOT import Identifier, IdentifierHash
19
20from PyCool import cool
21
22from array import array
23import struct
24import cppyy
25
26def lenFactor(thelen):
27 ''' Get the length of rows in the db, assuming that the number of entries will be divisible by some number of cells / SCs / other DB objects '''
28 factor_cells = thelen/ 195072.
29 factor_SC = thelen/34048.
30 factor_LATOME = thelen/116.
31 factor = None
32 if factor_cells.is_integer():
33 factor = int(factor_cells)
34 if factor_SC.is_integer():
35 factor = int(factor_SC)
36 elif factor_LATOME.is_integer():
37 factor = int(factor_LATOME)
38 return factor
39
40def payload_dict(payload, verbose=True):
41 contents = {}
42 for k in payload.keys():
43 contents[k] = {}
44 if isinstance(payload[k], cppyy.gbl.coral.Blob):
45 factor = lenFactor(len(payload[k])/4)
46 if verbose: print(f"{k} consists of {factor} item(s) per row")
47 contents[k]["payload"] = payload[k]
48 contents[k]["size"] = factor
49 else:
50 contents[k]["payload"] = payload[k]
51 contents[k]["size"] = None
52 return contents
53
54def patchChannel(idhelper, channel, patchdet, patchFT, patchChan, verbose=False ):
55 chid = idhelper.channel_Id(IdentifierHash(channel))
56 chidval = chid.get_identifier32().get_compact()
57 name = idhelper.channel_name(chid)
58 FT = idhelper.feedthrough_name(chid)
59
60 if len(patchdet) > 0:
61 if 0 in patchdet and idhelper.isEMBchannel(chid):
62 if verbose: print(name, chidval, "is in EMB")
63 return True
64 if 1 in patchdet and idhelper.isEMECOW(chid):
65 if verbose: print(name, chidval, "is in EMECOW")
66 return True
67 if 2 in patchdet and idhelper.isEMECIW(chid):
68 if verbose: print(name, chidval, "is in EMECIW")
69 return True
70 if 3 in patchdet and idhelper.isHECchannel(chid):
71 if verbose: print(name, chidval, "is in HEC")
72 return True
73 if 4 in patchdet and idhelper.isFCALchannel(chid):
74 if verbose: print(name, chidval, "is in FCAL")
75 return True
76 if len(patchFT) > 0:
77 if FT in patchFT:
78 if verbose: print(name, chidval, "is in FT", FT)
79 return True
80 if len(patchChan) > 0:
81 if chidval in patchChan:
82 if verbose: print(name, chidval, "is in list of channels")
83 return True
84
85 return False
86
87
88
90
91 def __init__(self, name="LArBlobMergeAlg", **kw):
92
93 kw['name'] = name
94 super(LArBlobMergeAlg,self).__init__(**kw)
95
96 self.indb=kw.get('inputdb',"")
97 self.patchdb=kw.get('patchingdb',"")
98 self.outdb=kw.get('outputdb',"")
99 self.folder=kw.get('fld',"")
100 self.run=kw.get('runnum',999999)
101 self.issc=kw.get('isSC',False)
102 self.patchdet=kw.get('patchdet',[])
103 self.patchFT=kw.get('patchFT',[])
104 self.patchChan=kw.get('patchChan',[])
105 self.nEvts=0
106
107 self.msg.info(f"indb: {self.indb}, patchdb:{self.patchdb}, patchdet: {self.patchdet}, patchFT: {self.patchFT}, patchChan: {self.patchChan}, outdb: {self.outdb}, folder: {self.folder}, run: {self.run}, issc: {self.issc}")
108 return
109
110
111
112 def initialize(self):
113
116
117 # Get DetectorStore...
118
119 self._detStore = PyAthena.py_svc('StoreGateSvc/DetectorStore')
120 if self._detStore is None:
121 self.msg.error("Failed to get DetectorStore")
122 return StatusCode.Failure
123
124 self._condStore = PyAthena.py_svc('StoreGateSvc/ConditionStore')
125 if (self._condStore is None):
126 self.msg.error("Failed to get ConditionStore")
127 return StatusCode.Failure
128
129 # Get LArOnlineID helper class
130 if self.issc:
131 self.onlineID=self._detStore.retrieve("LArOnline_SuperCellID","LArOnline_SuperCellID")
132 else:
133 self.onlineID=self._detStore.retrieve("LArOnlineID","LArOnlineID")
134 if self.onlineID is None:
135 self.msg.error("Failed to get LArOnlineID")
136 return StatusCode.Failure
137
138
140
141
142 # this could be also be a paramter from outside:
143 self.iovSince = 0
144 self.iovUntil = cool.ValidityKeyMax
145
146 return StatusCode.Success
147
148
149 def execute(self):
150 eid=ROOT.Gaudi.Hive.currentContext().eventID()
151
152 try:
153 if self.issc:
154 condCont=self._condStore.retrieve("CondCont<LArOnOffIdMapping>","LArOnOffIdMapSC")
155 else:
156 condCont=self._condStore.retrieve("CondCont<LArOnOffIdMapping>","LArOnOffIdMap")
157 self.larCabling=condCont.find(eid)
158 except Exception:
159 print("ERROR, failed to get LArCabling")
160 return StatusCode.Failure
161
162 if self.nEvts==0:
163 self.nEvts+=1
164 #Process one 'dummy' event to make sure all DB connections get closed
165 #print ("Dummy event...")
166 return StatusCode.Success
167
168 self.onlineID.set_do_checks(True)
169 #self.offlineID.set_do_checks(True)
170
171 return StatusCode.Success
172
173
174 def finalize(self):
175 self.msg.info('finalizing...')
176 return StatusCode.Success
177
178 def stop(self): # here is all the code for the merging
179
180 from CoolConvUtilities.AtlCoolLib import indirectOpen
181
182 # reading main input blob
183 inconn = indirectOpen(self.indb, True)
184 if (inconn is None):
185 self.msg.error(f"Cannot connect to database {self.indb}")
186 raise RuntimeError(f"ERROR: Cannot connect to database {self.indb}")
187
188 contents = {}
189 contents2 = {}
190 try:
191 folder=inconn.getFolder(self.folder)
192 runiov=self.run << 32
193 obj=folder.findObject(runiov,0)
194 payload=obj.payload()
195
196 contents.update(payload_dict(payload))
197
198 except Exception as e:
199 self.msg.warning(f"Could not decode {self.folder} from {self.indb}\n{e}")
200 return StatusCode.Failure
201
202 # reading second input blob
203 if self.indb == self.patchdb:
204 inconn2=inconn
205 else:
206 inconn2 = indirectOpen(self.patchdb, True)
207 if (inconn2 is None):
208 self.msg.error(f"Cannot connect to database {self.patchdb}")
209 raise RuntimeError(f"ERROR: Cannot connect to database {self.patchdb}")
210
211 try:
212 folder=inconn2.getFolder(self.folder)
213 runiov=self.run << 32
214 obj=folder.findObject(runiov,0)
215 payload=obj.payload()
216 contents2.update(payload_dict(payload))
217 except Exception:
218 self.msg.warning(f"Could not decode {self.folder} from {self.patchdb}")
219 return StatusCode.Failure
220
221 # create folder
222 dbSvc = cool.DatabaseSvcFactory.databaseService()
223 try:
224 outconn = dbSvc.openDatabase(self.outdb,False)
225 outfolder = outconn.getFolder(self.folder)
226 except Exception:
227 #create one
228 outconn = dbSvc.createDatabase(self.outdb)
229 folder = inconn.getFolder(self.folder)
230 fspec = folder.folderSpecification()
231 from CaloCondBlobAlgs import CaloCondTools
232 desc = CaloCondTools.getAthenaFolderDescr()
233 outfolder = outconn.createFolder(self.folder, fspec, desc, True)
234
235 # create new blobs
236 spec = cool.RecordSpecification()
237
238 for k in contents.keys():
239 if isinstance(contents[k]["payload"], cppyy.gbl.coral.Blob):
240 spec.extend( k, cool.StorageType.Blob16M )
241 elif isinstance(contents[k]["payload"], int):
242 spec.extend( k, cool.StorageType.UInt32 )
243 else:
244 self.msg.error(f"CANNOT WORK OUT THE DATATYPE OF {k}: {contents[k]} FOR STORAGE")
245 return StatusCode.Failure
246
247 data = cool.Record( spec )
248
249 hash_max = self.onlineID.channelHashMax()
250 fsize = 4
251
252 btype=getattr(ROOT,"coral::Blob")
253 outblob = {}
254 vVec = {}
255 blobvals = [ k for k in contents.keys() if isinstance( contents[k]["payload"], cppyy.gbl.coral.Blob) ]
256 blobvals2 = [ k for k in contents2.keys() if isinstance( contents2[k]["payload"], cppyy.gbl.coral.Blob) ]
257
258 if len(blobvals) != len(blobvals2):
259 self.msg.error("The same blob payloads are not present in the two dbs!\n(1) {blobvals}\n(2) {blobvals2}")
260 return StatusCode.Failure
261
262 for bv in blobvals:
263 vecLength = contents[bv]["size"]
264 blobsize = vecLength*hash_max*fsize
265 outblob[bv] = btype()
266 outblob[bv].resize(blobsize)
267
268 if blobsize != len(contents[bv]["payload"]):
269 self.msg.error(f'Wrong size {len(contents[bv]["payload"])} of the input blob, should be {blobsize}')
270 return StatusCode.Failure
271
272 vVec[bv] = ROOT.std.vector('float')(vecLength*hash_max)
273
274 blobdata = contents[bv]["payload"].read()
275
276 for i in range(0,hash_max):
277 if vecLength > 1:
278 for iv in range(0, vecLength):
279 vVec[bv][i*vecLength + iv] = float(struct.unpack('f',blobdata[(i*vecLength + iv)*fsize:((i*vecLength+iv)*fsize+fsize)])[0])
280 else:
281 vVec[bv][i] = float(struct.unpack('f',blobdata[i*fsize:(i*fsize+fsize)])[0])
282
283 if blobsize != len(contents2[bv]["payload"]):
284 self.msg.error(f'Wrong size {len(contents2[bv]["payload"])} of the input blob, should be {blobsize}')
285 return StatusCode.Failure
286
287 # patch what is needed from the second blob
288 blobdata2 = contents2[bv]["payload"].read()
289 vecLength2 = contents2[bv]["size"]
290 if vecLength != vecLength2:
291 self.msg.error(f"Item {bv} has a different shape in the base and patch dbs! {vecLength} & {vecLength2}")
292 return StatusCode.Failure
293
294
295 for i in range(0,hash_max):
296 doPatch = patchChannel(self.onlineID, i, self.patchdet, self.patchFT, self.patchChan)
297 if doPatch:
298 if vecLength > 1:
299 for iv in range(0, vecLength):
300 vVec[bv][i*vecLength + iv] = float(struct.unpack('f',blobdata2[(i*vecLength + iv)*fsize:((i*vecLength+iv)*fsize+fsize)])[0])
301 else:
302 vVec[bv][i] = float(struct.unpack('f', blobdata2[i*fsize:(i*fsize+fsize)])[0])
303
304 #now trick with buffer
305 writable_buf = { k: array('f') for k in vVec.keys() }
306 buff = {}
307 for key in vVec.keys():
308 for i in range(0,vVec[key].size()):
309 writable_buf[key].append(vVec[key][i])
310 #and use buffers to fill blobs
311 buff[key] = writable_buf[key].tobytes()
312
313 for j in range(0, len(buff[key])):
314 outblob[key][j] = buff[key][j]
315
316 data[key] = outblob[key]
317
318 tofill = [ k for k in contents.keys() if k not in outblob.keys() ]
319
320 for tf in tofill:
321 if contents[tf]["payload"] != contents2[tf]["payload"]:
322 self.msg.warning(f'Setting {tf} to {contents[tf]["payload"]}, but beware: the value is {contents2[tf]["payload"]} in the patching db')
323 data[tf] = contents[tf]["payload"]
324
325 #and record data
326 outfolder.storeObject(self.iovSince, self.iovUntil, data, cool.ChannelId(0))
327
328 #=== close the database
329 outconn.closeDatabase()
330
331 return StatusCode.Success
332
333
334
335
336
void print(char *figname, TCanvas *c1)
MsgStream & msg() const
This is a "hash" representation of an Identifier.
virtual StatusCode stop() override
virtual StatusCode execute() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
STL class.
_detStore
note that we are using the python logging service and that the PyAthena.Alg base class has already in...
__init__(self, name="LArBlobMergeAlg", **kw)
patchChannel(idhelper, channel, patchdet, patchFT, patchChan, verbose=False)
payload_dict(payload, verbose=True)
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)