ATLAS Offline Software
Loading...
Searching...
No Matches
AtlRunQuerySelectorBase.py
Go to the documentation of this file.
1# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2
3
4from PyCool import cool
5from copy import deepcopy
6
7from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn, MergeRanges, SmartRangeCalulator #, runsOnServer, GetRanges
8from CoolRunQuery.utils.AtlRunQueryIOV import IOVTime, IOVRange
9
10import sys
11from time import time
12from collections import namedtuple, defaultdict
13
14from itertools import groupby
15from operator import itemgetter
16
17
18OOO = namedtuple('OOO', 'channel payload iovrange isvf')
19def coolgen(coolobjs):
20 while coolobjs.goToNext():
21 obj=coolobjs.currentRef()
22 yield OOO(obj.channelId(),obj.payloadValue, IOVRange(obj), False)
23
24ChDesc = namedtuple('ChDesc','CoolPayloadKey,CoolChannel,ResultKey,Header,SelectShowRetrieve')
25
27 SELECT = 0x1
28 SHOW = 0x2
29 RETRIEVE = 0x4
30
31 UNDEFINED = 0
32 STREAM = 1
33 DETECTOR = 2
34 DQDEFECTS = 3
35
36 def __init__(self, x, keytype=UNDEFINED, selecttype = SELECT):
37 if isinstance(x,str):
39 self._cool_attr = ''
40 self._internal_key = x.strip()
42 self._table_header = x.strip()
43 else:
44 self._cool_channel = x[0]
45 self._cool_attr = x[2]
46 self._internal_key = x[1].strip()
47 self._second_internal_key = ''
48 self._table_header = x[1] if len(x)<4 else x[3]
49 self._type = keytype
50 self._select_show_retrieve = selecttype # 0-retrieve only, 1-select, 2-show, 3-select&show
51
52 @property
53 def _seltype(self):
54 if self._select_show_retrieve == DataKey.SELECT:
55 return "SELECT"
56 if self._select_show_retrieve == DataKey.SHOW:
57 return "SHOW"
58 if self._select_show_retrieve == DataKey.RETRIEVE:
59 return "RETRIEVE"
60 return ""
61
62 def __repr__(self):
63 return "<DataKey '%s' type %i %s>" % (self._internal_key, self._type, self._seltype)
64
65 def __eq__(self,other):
66 if isinstance(other,DataKey):
67 #print ("Compare (%s) with (%s) ==> %r" % (self._internal_key,other._internal_key, self._internal_key==other._internal_key))
68 eq = self._internal_key==other._internal_key and self._second_internal_key==other._second_internal_key
69 return eq
70 else:
71 #print ("Compare (%s) with '%s' ==> %r" % (self._internal_key, other, self._internal_key==other))
72 eq = self._internal_key==other
73 return eq
74
75 def __cmp__(self,other):
76 if isinstance(other,DataKey):
77 if self._internal_key==other._internal_key and self._second_internal_key==other._second_internal_key:
78 return 0
79 if self._internal_key==other._internal_key:
80 return self._second_internal_key.__cmp__(other._second_internal_key)
81 return self._internal_key.__cmp__(other._internal_key)
82 else:
83 #print ("Compare (%s) with '%s' ==> %r" % (self._internal_key, other, self._internal_key==other))
84 return self._internal_key==other
85
86 def __hash__(self):
87 return hash( (self._internal_key,self._second_internal_key) )
88
89 def pickled(self):
90 if self._second_internal_key != '':
91 return (self._internal_key,self._second_internal_key)
92 return self._internal_key
93
94
95 @property
96 def CoolPayloadKey(self):
97 return self._cool_attr
98
99 @property
100 def CoolChannel(self):
101 return self._cool_channel
102
103 @property
104 def ResultKey(self):
105 return self._internal_key
106
107 @property
108 def Header(self):
109 return self._table_header
110
111 @property
113 return self._select_show_retrieve
114
115 @property
116 def Type(self):
117 return self._type
118
119
120
121
123 _conddb = None
124 condtag = ""
125
126 @staticmethod
128 Selector._conddb = 'OFLP200'
129
130 @staticmethod
131 def condDB(run_number = None ):
132 if not run_number:
133 if Selector._conddb:
134 return Selector._conddb
135 raise RuntimeError("CondDB not yet set")
136
137 Selector._conddb = "CONDBR2" if run_number > 236100 else "COMP200"
138 print ("Determinded %s, based on run number %i" % (Selector._conddb, run_number))
139 return Selector._conddb
140
141 @staticmethod
142 def isRun2(run_number = None ):
143 return Selector.condDB(run_number)=="CONDBR2"
144
145 def __init__(self, name):
146 self.name = name
147 self.priority = 50
148 self.verbose = False
149 self.applySelection = True
150 self.mcenabled = False
151 def select(self, runlist):
152 return runlist
153 def prettyValue(self, value, key):
154 return value
155 def ApplySelection(self, key):
156 return self.applySelection
157 def runAfterQuery(self, runlist):
158 pass
159
160 # use this method for the selector to specify which data should be stored
161 def valueForStorage(self, condData, key):
162 return None
163
164
166 def __init__(self, name, dbfolderkey, channelKeys):
167 super(Condition,self).__init__(name)
168 self.selDataMissing = False
169 self.setSchemaFolderTag(dbfolderkey)
170 self.setChannelKeys(channelKeys)
171
172 def setSchemaFolderTag(self,dbfoldertag):
173 # setting schema, folder, and tag
174 self.schema, foldertag = dbfoldertag.split('::')
175 self.folder, self.tagname = (foldertag.split('#')+[''])[0:2]
176 if self.tagname=="":
177 self.tagname = self.condtag
178
179 def setChannelKeys(self,channelKeys,ssr=None):
180 self.data_keys = [DataKey(x) for x in channelKeys]
181 if ssr:
182 assert len(ssr)==len(self.data_keys)
183 for (x,_ssr) in zip(self.data_keys,ssr):
184 x._select_show_retrieve = _ssr
185
186 self._channelKeys = channelKeys
187 self._coolpayloadkey = [x[2] for x in channelKeys]
188 self._coolchannel = [x[0] for x in channelKeys]
189 self._resultKey = [x[1] for x in channelKeys]
190 self._headerKey = [x[1] for x in channelKeys]
191 if not ssr:
192 self._doSelectShowRetrieve = len(channelKeys)*[1] # 0-retrieve only, 1-select, 2-show, 3-select&show
193 else:
194 self._doSelectShowRetrieve = ssr
196
197 def _getFolder(self):
198 f = coolDbConn.GetDBConn(schema = self.schema,db=Selector.condDB()).getFolder(self.folder)
199 if f.versioningMode()==0:
200 self.tagname=""
201 if self.tagname not in ["HEAD", ""]:
202 self.tagname = f.resolveTag(self.tagname)
203 return f
204
205 def addChannelKey(self,channelKey,ssr=None):
206 """
207 channelKey: (0,'SMK','MasterConfigurationKey')
208 ssr: 0-retrieve only, 1-select, 2-show, 3-select&show
209 """
210
211 if DataKey(channelKey) in self.data_keys:
212 # key already exists
213 return
214
215 self.data_keys += [ DataKey(channelKey) ]
216 if ssr:
217 self.data_keys[-1]._select_show_retrieve = ssr
218
219 self._channelKeys += [ channelKey ]
220 self._coolpayloadkey += [ channelKey[2] ]
221 self._coolchannel += [ channelKey[0] ]
222 self._resultKey += [ channelKey[1] ]
223 self._headerKey += [ channelKey[1] ]
224 if ssr:
225 self._doSelectShowRetrieve += [ ssr ]
226 else:
227 self._doSelectShowRetrieve += [ 1 ]
228 self._channeldesc = self.data_keys
229
230 def ResultKey(self):
231 return self._resultKey
232
233 def ChannelKeys(self):
234 return self._channelKeys
235
236 def CoolChannels(self):
237 return self._coolchannel
238
239 def HeaderKeys(self):
240 return self._headerKey
241
243 return self._doSelectShowRetrieve
244
245 def ChannelDesc(self):
246 return self._channeldesc
247
248 def setShowOutput(self, listofchans=None ):
249 for cd in self.ChannelDesc():
250 if listofchans and cd.ResultKey not in listofchans:
251 continue
252 cd._select_show_retrieve |= 2
253 self._doSelectShowRetrieve = [cd.SelectShowRetrieve for cd in self._channeldesc]
254 self.updateShowOrder()
255
257 from CoolRunQuery.AtlRunQueryRun import Run
258 for cd in self.ChannelDesc():
259 if cd.SelectShowRetrieve<2:
260 continue
261 Run.AddToShowOrder(cd)
262
263
265 def __init__(self, name, dbfolderkey, channelKeys=None):
266 super(RunLBBasedCondition,self).__init__(name, dbfolderkey, channelKeys)
267
268 def _retrieve(self, iovmin, iovmax, f, sortedRanges):
269 chansel = None
270 for ch1,ch2 in sortedRanges:
271 if chansel is None:
272 chansel = cool.ChannelSelection(ch1,ch2,cool.ChannelSelection.sinceBeforeChannel)
273 else:
274 chansel.addRange(ch1,ch2)
275 #print (self.name,"browsing objects with tag",self.tagname)
276 return coolgen(f.browseObjects( iovmin, iovmax, chansel, self.tagname))
277
278 def findPayload(self, runNr, iovpllist):
279 """
280 iovpllist is a list [(IOV,payload),(IOV,payload),..,(IOV,payload)]
281 """
282 for x in iovpllist:
283 if IOVTime(runNr,1).inRange(x[0]):
284 return x[1]
285 return 'n.a.'
286
287 def getSortedAndCompletedPayloadDict(self, iovpllist, runlist, key):
288 # reduce to data in the run 'runNr' and sort by iov start time
289
290 pld = defaultdict(list)
291 if not iovpllist:
292 return pld
293 if not runlist:
294 return pld
295
296 runnrlist = [r.runNr for r in runlist]
297 nlb = dict([(r.runNr,r.lastlb) for r in runlist])
298
299 # first and last run of the runlist
300 firstrun = runnrlist[0]
301 lastrun = runnrlist[-1]
302
303 for iov, pl in iovpllist:
304 first = max(firstrun,iov.startTime.run)
305 last = min(lastrun,(iov.endTime-1).run)
306 for r in range(first,last+1):
307 if r in runnrlist:
308 pld[r].append((deepcopy(iov),pl))
309
310 # adjustments of IOV
311 for runnr, iovplbyrun in pld.items():
312 # adjust lb 0 to lb 1
313 if iovplbyrun[0][0].startTime.lb==0:
314 iovplbyrun[0][0].startTime.lb=1
315
316 # truncate first IOV to a single run
317 iovplbyrun[0][0].truncateToSingleRun(runnr)
318
319 # sometimes an IOV has [RunLB1 - RunLB2), where RunLB2 has LB==1
320 # -> that gets truncated to [RunLB2 - RunLB2), which is obviously not valid
321 # -> so we slice that right out
322 if iovplbyrun[0][0].startTime == iovplbyrun[0][0].endTime:
323 iovplbyrun[:1] = []
324
325 # truncate last IOV to a single run
326 lastiov = iovplbyrun[-1][0]
327 lastiov.truncateToSingleRun(runnr)
328
329 # insert IOVs for missing ranges (value n.a.)
330 idx=0
331 curTime = IOVTime(runnr,1)
332 while curTime<lastiov.startTime:
333 iov = iovplbyrun[idx][0]
334 if curTime < iov.startTime:
335 # insert n.a. for missing iov
336 missingIOV = IOVRange(starttime=curTime, endtime=iov.startTime)
337 iovplbyrun.insert( idx, (missingIOV, "n.a.") )
338 curTime = IOVTime(iov.endTime)
339 idx += 1
340
341 # also check if IOV at the end is missing
342 runEndTime = IOVTime(runnr,nlb[runnr]+1) # last lb + 1
343 if lastiov.endTime<runEndTime:
344 missingIOV = IOVRange(starttime=lastiov.endTime, endtime=runEndTime)
345 iovplbyrun.append( (missingIOV, "n.a.") )
346
347 return pld
348
349 def readCondData(self, runranges, f, sortedRanges):
350 # get the data from cool
351 condData = defaultdict(list)
352
353 keys = dict([(ch,set(k)) for ch, k in groupby(sorted(self.ChannelKeys(), key=itemgetter(0)), itemgetter(0))])
354
355 for rr in runranges:
356 iovmin=(rr[0] << 32)+0
357 iovmax=((rr[1]+1) << 32)-1
358
359 # access to COOL
360 objs = self._retrieve(iovmin, iovmax, f, sortedRanges)
361
362 for obj in objs:
363 ch = obj.channel
364 for chtmp, internalKey, payloadKey in keys[ch]:
365 if type(payloadKey)==tuple:
366 if obj.isvf:
367 payload = obj.payload
368 else:
369 payload = tuple(map(obj.payload, payloadKey))
370 else:
371 payload = obj.payload(payloadKey)
372
373 condData[internalKey].append( (IOVRange(obj.iovrange), payload) )
374
375 return condData
376
377 def select(self, runlist):
378 print (self, end='')
379 sys.stdout.flush()
380 start = time()
381 newrunlist = []
382
383 sortedChannel = sorted( list( set( self.CoolChannels() ) ) )
384 sortedRanges = MergeRanges([(x,x) for x in sortedChannel])
385
386 runranges = SmartRangeCalulator(runlist)
387
388 f = self._getFolder()
389
390 condData = self.readCondData(runranges, f, sortedRanges)
391
392 # for each key sort the data by IOV start time
393 for k in self.ResultKey():
394 condData[k].sort()
395 # if k.startswith('Rele'):
396 # for x in condData[k]:
397 # print("TEST",k,x)
398
399 condDataDict = {}
400 for k in self.ResultKey():
401 condDataDict[k] = self.getSortedAndCompletedPayloadDict(condData[k],runlist,k)
402
403 for run in runlist: # go through old runlist and see
404
405 rejectSomething = False
406 for k in self.ResultKey():
407 if run.runNr not in condDataDict[k]:
408 if self.ApplySelection(k):
409 rejectSomething = True
410 run.addResult(k, "n.a.")
411 continue
412
413 datavec = condDataDict[k][run.runNr]
414
415 if not datavec or len(datavec)==0:
416 run.addResult(k, "n.a.")
417 continue
418
419 if 'n.a.' in datavec:
420 run.showDataIncomplete = True
421
422 for iov, data in datavec:
423 self.selDataMissing = False
424 if self.ApplySelection(k) and not self.passes(data,k):
425 run.addResult(k, self.prettyValue(data,k), iov, reject=True, valueForStorage=self.valueForStorage(data,k))
426 rejectSomething = True
427 else:
428 run.addResult(k, self.prettyValue(data,k), iov, valueForStorage=self.valueForStorage(data,k))
429 if self.selDataMissing:
430 run.selDataIncomplete = True
431
432 if not (rejectSomething and self.rejectRun(run)):
433 newrunlist += [run.runNr]
434
435 runlist = [r for r in runlist if r.runNr in newrunlist]
436
437 duration = time() - start
438
440 print (" ==> %i runs found (%.2f sec)" % (len(runlist),duration))
441 else:
442 print (" ==> Done (%g sec)" % duration)
443
444 return runlist
445
446
447 def rejectRun(self,run):
448 return run.data.isRejected
449
450
452 def __init__(self, name, dbfolderkey, channelKeys=None):
453 super(TimeBasedCondition,self).__init__(name, dbfolderkey, channelKeys)
454
455
456 def getSortedAndCompletedPayloadDict(self, iovpllist, runlist,key):
457 # reduce to data in the run 'runNr' and sort by iov start time
458
459 pld = {}
460 if not iovpllist or not runlist:
461 return pld
462
463 startiovindex = 0
464 for run in runlist:
465 pld[run.runNr] = []
466
467 if startiovindex<-1:
468 continue
469
470 # find the first iov that overlaps
471 iovindex = startiovindex
472 while True:
473 try:
474 iov = iovpllist[iovindex][0]
475 iovstart = iov.startTime.time
476 iovend = iov.endTime.time
477
478 if iovend>run.sor:
479 startiovindex = iovindex
480 break
481
482 if iovstart>run.eor:
483 startiovindex = -1
484 break
485
486 if iovend<run.sor and iovindex==len(iovpllist)-1:
487 startiovindex = -2
488 break
489
490 iovindex += 1
491 except IndexError:
492 startiovindex = -1
493
494 if startiovindex<0:
495 continue
496
497 # find the last iov that overlaps
498 iovindex = startiovindex
499 while True:
500 iov = iovpllist[iovindex][0]
501 iovstart = iov.startTime.time
502 iovend = iov.endTime.time
503
504 if iovend>=run.eor or iovindex==len(iovpllist)-1:
505 endiovindex = iovindex
506 break
507
508 iovindex += 1
509
510 # now we have the first and last iov that overlap with that run: startiovindex, endiovindex
511 lbindex = 0
512 for iov,pl in iovpllist[startiovindex:endiovindex+1]:
513 iovstart = iov.startTime.time
514 iovend = iov.endTime.time
515 endrun = run.runNr
516
517 while lbindex<len(run.lbtimes) and run.lbtimes[lbindex][0]<iovstart:
518 lbindex += 1
519 startlb = lbindex+1
520
521 while lbindex<len(run.lbtimes) and run.lbtimes[lbindex][0]<iovend:
522 lbindex += 1
523 lbindex -= 1
524
525 # now lbindex points to the last lb that starts within the iov or to one after the last lb
526 if lbindex==len(run.lbtimes)-1:
527 endlb = 0
528 endrun += 1
529 else:
530 endlb = lbindex+2 # +1 for lb-index->lb-number, +1 for closed interval
531
532
533 # append info of this IOV
534 lastIOV,lastpl=None,None
535 if len(pld[run.runNr])>0:
536 lastIOV,lastpl = pld[run.runNr][-1]
537 if lastpl==pl:
538 lastIOV.endTime.run = endrun
539 lastIOV.endTime.lb = endlb
540 else:
541 pld[run.runNr] += [(IOVRange(runStart=run.runNr, lbStart=startlb, runEnd=endrun, lbEnd=endlb), pl)] # in this case the end LB is inclusive
542
543
544 # for the next run we start looking at:
545 startiovindex = endiovindex
546
547 return pld
548
549
550 def select(self, runlist):
551 runlistNo = [run.runNr for run in runlist]
552 start = time()
553 newrunlist = []
554 f = coolDbConn.GetDBConn(schema=self.schema, db=Selector.condDB()).getFolder(self.folder)
555 sortedChannel = sorted( list( set( self.CoolChannels() ) ) )
556 chansel = None
557 for ch in sortedChannel:
558 if chansel is None:
559 chansel = cool.ChannelSelection(ch,ch,cool.ChannelSelection.sinceBeforeChannel)
560 else:
561 chansel.addChannel(ch)
562
563 runranges = SmartRangeCalulator(runlist,True)
564 condData = defaultdict(list)
565
566
567 # access COOL
568 for rr in runranges:
569 firstrun = runlist[runlistNo.index(rr[0])]
570 lastrun = runlist[runlistNo.index(rr[1])]
571 iovmin=firstrun.sor
572 iovmax=lastrun.eor
573
574 objs = f.browseObjects( iovmin, iovmax, chansel)
575 while objs.goToNext():
576 obj= objs.currentRef()
577 ch = obj.channelId()
578 for chKey in self.ChannelKeys():
579 (channelNumber, resultKey, payloadKey) = chKey
580 if channelNumber != ch:
581 continue
582 isBlob = (resultKey == 'olc:bcidmask')
583 if isBlob:
584 payloadvalue = obj.payload()[chKey[2]].read()
585 else:
586 payloadvalue = obj.payloadValue(chKey[2])
587
588 condData[resultKey].append( (IOVRange(obj=obj, timebased=True), payloadvalue) )
589
590
591 # for each key sort the data by IOV start time
592 for k in self.ResultKey():
593 condData[k].sort()
594
595 condDataDict = {}
596 for k in self.ResultKey():
597 condDataDict[k] = self.getSortedAndCompletedPayloadDict(condData[k],runlist,k)
598
599 for run in runlist:
600
601 rejectSomething = False
602 for k in self.ResultKey():
603
604 if run.runNr not in condDataDict[k]:
605 run.addResult(k, "n.a.")
606 continue
607
608 datavec = condDataDict[k][run.runNr]
609
610 if not datavec or len(datavec)==0:
611 run.addResult(k, "n.a.")
612 continue
613
614 if 'n.a.' in datavec:
615 run.showDataIncomplete=True
616
617 for iov, data in datavec:
618 self.selDataMissing = False
619 if self.ApplySelection(k) and not self.passes(data,k):
620 run.addResult(k, self.prettyValue(data,k), iov, reject=True)
621 rejectSomething = True
622 continue
623
624 run.addResult(k, self.prettyValue(data,k), iov)
625
626 if self.selDataMissing:
627 run.selDataIncomplete = True
628
629 if not (rejectSomething and self.rejectRun(run)):
630 newrunlist += [run.runNr]
631
632
633 runlist = [r for r in runlist if r.runNr in newrunlist]
634
635 duration = time() - start
636
638 print (" ==> %i runs found (%.2f sec)" % (len(runlist),duration))
639 else:
640 print (" ==> Done (%g sec)" % duration)
641
642 return runlist
643
644 def rejectRun(self,run):
645 return run.data.isRejected
646
bool inRange(const double *boundaries, const double value, const double tolerance=0.02)
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
RootType Type
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
Validity Range object.
Definition IOVRange.h:30
Basic time unit for IOVSvc.
Definition IOVTime.h:33
STL class.
__init__(self, x, keytype=UNDEFINED, selecttype=SELECT)
STL class.
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)