ATLAS Offline Software
AtlRunQuerySelectorStreams.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 
4 import re, sys
5 try:
6  Set = set
7 except NameError:
8  from sets import Set
9 from time import time
10 
11 from CoolRunQuery.utils.AtlRunQueryTimer import timer
12 from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
13 from CoolRunQuery.AtlRunQueryRun import Run
14 from CoolRunQuery.selector.AtlRunQuerySelectorBase import Selector, DataKey
15 
16 
17 class StreamSelector(Selector):
18  def __init__(self, name, streams=[]):
19  # streams can be ["*RPC*", "phy* 10k", "deb*,cal* 100k"]
20  # will select runs that satiesfy an 'and' of all patterns in the list
21  # a pattern without number are checked for existence
22  # a pattern with number will be checked against the sum of the events in all matching streams
23 
24  self.streams = ' and '.join(streams)
27 
28  # selstreampatterns is a list of (pattern,nevents) tuples
29  for s in streams:
30  pattern = s.split()[0].replace(',','|').replace('*','.*').replace('%','.*').replace('?','.')
31  negate = pattern[0]=='!'
32  pattern = pattern.lstrip('!')
33  p = re.compile(pattern)
34  nevt = (s.split()+[None])[1]
35  self.selstreamspatterns += [(p,nevt,negate,pattern)]
36 
37  super(StreamSelector,self).__init__(name)
38 
39  def setShowOutput(self):
40  pass
41 
42  def addShowStreamPattern(self, streampattern="*"):
43  self.showstreampatterns += streampattern.replace('*','.*').replace('%','.*').replace('?','.').split(',')
44 
45  def __str__(self):
46  if len(self.streams)!=0:
47  return 'SELOUT Checking if the stream name matches "%s"' % self.streams
48  else:
49  return "Retrieving stream names that match '%s'" % ','.join(self.showstreampatterns)
50 
51  def select(self, runlist):
52 
53  # some preparation: compile the show patterns
54  start = time()
55 
56  if '.*' in self.showstreampatterns:
57  compiledShowPatterns = [re.compile('.*')]
58  else:
59  compiledShowPatterns = [re.compile(p) for p in self.showstreampatterns]
60 
61  # we disable the access to everything that is not needed if we only select
62  ShowStreams = False
63  useTier0 = False
64  if len(compiledShowPatterns)>0:
65  ShowStreams = True
66  useTier0 = False # ATLAS_T0 responsible refusing to allow access to their DB
67 
68 
69  print (self, end='')
70  sys.stdout.flush()
71  newrunlist = []
72  allStreams = Set() # list of all the streams that are in the selected runs
73  connection = coolDbConn.GetSFODBConnection()
74  cursor = connection.cursor()
75  cursor.arraysize = 1000
76 
77  runnrlist = [r.runNr for r in runlist]
78 
79  from CoolRunQuery.AtlRunQuerySFO import GetSFO_streamsAll,GetSFO_filesAll
80  with timer('GetSFO_streamsAll', disabled=True):
81  streamsall = GetSFO_streamsAll( cursor, runnrlist ) # { runnr: [streams] }
82  with timer('GetSFO_filesAll', disabled=True):
83  filesall = GetSFO_filesAll( cursor, runnrlist ) # [(COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS))]
84 
85  if ShowStreams:
86  from CoolRunQuery.AtlRunQuerySFO import GetSFO_LBsAll,GetSFO_NeventsAll,GetSFO_overlapAll
87  with timer('GetSFO_LBsAll', disabled=True):
88  lbinfoall = GetSFO_LBsAll( cursor, runnrlist ) # [(MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), #LUMIBLOCKS)]
89  with timer('GetSFO_overlapAll', disabled=True):
90  overlapall = GetSFO_overlapAll( cursor, runnrlist ) # [(SUM(OVERLAP_EVENTS))]
91  smallrunnrlist=[]
92  for r in runnrlist: # go through old runlist and see
93  if r not in streamsall:
94  continue
95  for s in streamsall[r]:
96  if r in lbinfoall and s in lbinfoall[r] and lbinfoall[r][s][1]>0:
97  smallrunnrlist += [r]
98  break
99  with timer('GetSFO_NeventsAll', disabled=True):
100  neventsall = GetSFO_NeventsAll( cursor, smallrunnrlist ) # [(LUMIBLOCKNR, NREVENTS)]
101 
102 
103 
104  for run in runlist: # go through old runlist and see
105 
106  streams = [] # names of all streams in this run
107  strsize = [] # size of streams in this run
108  strevents = [] # nr of events in this run
109  if run.runNr in streamsall:
110  streams = streamsall[run.runNr]
111  for s in streams:
112  try:
113  _, size, events = filesall[run.runNr][s]
114  except IndexError:
115  _, size, events = (0,0,0)
116  strsize += [size]
117  strevents += [events]
118 
119 
120 
121  if ShowStreams:
122  from CoolRunQuery.utils.AtlRunQueryUtils import Matrix
123 
124  nst = len(streams)
125  stovmat = Matrix(nst,nst) # overlap matrix
126  for i,s in enumerate(streams):
127  run.stats['STR:'+s] = {}
128 
129  # fill overlaps into matrix
130  for j,s2 in enumerate(streams):
131  try:
132  eventsij = overlapall[run.runNr][s][s2]
133  except KeyError:
134  eventsij = 0
135  if i==j and events:
136  eventsij = events
137  stovmat.setitem(i,j,float(eventsij))
138  stovmat.setitem(j,i,float(eventsij)) # symmetrise matrix
139 
140 
141  # read number of events per LB
142  minlb, maxlb, lbs = (0,0,1)
143  try:
144  minlb, maxlb, lbs = lbinfoall[run.runNr][s]
145  except KeyError:
146  pass
147 
148  # if minlb==maxlb==0 -> no file closure at LB boundary
149  if minlb == 0 and maxlb == 0:
150  run.stats['STR:'+s]['LBRecInfo'] = None
151  continue
152  else:
153  lbevcount = '<tr>'
154  result = neventsall[run.runNr][s] #[ (lb,nev),... ]
155  lbold = -1
156  allnev = 0
157  ic = 0
158  ice = 0
159  allic = 0
160  lastElement = False
161  firstcall = True
162 
163  for ice,(lb,nev) in enumerate(result):
164  if ice == len(result):
165  lastElement = True
166  allnev += nev
167  if lb != lbold or lastElement:
168  if lbold != -1:
169  ic += 1
170  allic += 1
171  if allic < 101:
172  if ic == 9:
173  ic = 1
174  lbevcount += '</tr><tr>'
175  lbevcount += '<td style="font-size:75%%">%i&nbsp;(%s)</td>' % (lbold,allnev)
176  else:
177  if firstcall:
178  lbevcount += '</tr><tr>'
179  lbevcount += '<td style="font-size:75%%" colspan="8">... <i>too many LBs (> 100) to show</i></td>'
180  firstcall = False
181  allnev = nev
182  lbold = lb
183  else:
184  allnev += nev
185  lbevcount += '</tr>'
186  run.stats['STR:'+s]['LBRecInfo'] = lbevcount
187  run.stats['STR:'+s]['LBRecInfo'] = result
188 
189  # add overlap information to the run stats
190  for i in range(nst):
191  statkey = 'STR:'+streams[i]
192  run.stats[statkey]['StrOverlap'] = []
193  denom = stovmat.getitem(i,i)
194  if denom==0:
195  continue
196  for j in range(nst):
197  if i == j or stovmat.getitem(i,j) == 0:
198  continue
199  fraction = 100
200  if stovmat.getitem(i,j) != denom:
201  fraction = float(stovmat.getitem(i,j))/float(denom)*100.0
202  run.stats[statkey]['StrOverlap'] += [(streams[j], fraction)]
203 
204 
205 
206  # selection...
207  if not self.passes(zip(streams,strevents),0):
208  continue
209  newrunlist += [run.runNr]
210  allStreams.update(streams)
211  for k,v,s in zip(streams,strevents,strsize):
212  run.addResult('STR:'+k, (v,s))
213 
214  allStreams = ['STR:'+s for s in allStreams]
215 
216 
217  allStreams.sort(key = lambda x: (x[5:]) )
218  allStreams.sort(key = lambda x: (x[4]), reverse=True )
219 
220  # fill the gaps
221  for run in runlist:
222  for s in allStreams:
223  if s not in run.result:
224  run.addResult(s, 'n.a.')
225 
226  runlist = [r for r in runlist if r.runNr in newrunlist]
227 
228  # check if the streams in 'allStreams' match the show patterns
229  for s in allStreams:
230  if any( [p.match(s[4:]) is not None for p in compiledShowPatterns] ):
231  Run.AddToShowOrder(DataKey(s, keytype=DataKey.STREAM))
232 
233 
234 
235 
236  if useTier0:
237 
238  # retrieve Tier-0 information
239  from CoolRunQuery.AtlRunQueryTier0 import GetTier0_datasetsAndTypes
240  tier0connection = coolDbConn.GetTier0DBConnection()
241  cursor = tier0connection.cursor()
242  cursor.arraysize = 1000
243  tier0retdico = GetTier0_datasetsAndTypes( cursor, runnrlist )
244 
245  # add Tier0 information
246  for run in runlist:
247  for s in allStreams:
248  if run.result[s]=='n.a.':
249  continue
250  run.stats[s]['StrTier0TypesRAW'] = {}
251  run.stats[s]['StrTier0TypesESD'] = {}
252  run.stats[s]['StrTier0AMI'] = {}
253  if run.runNr in tier0retdico.keys():
254  for dsname,t,pstates in tier0retdico[run.runNr]:
255  if s.replace('STR:','') in dsname:
256  if '.RAW' in dsname:
257  if '.merge' in dsname:
258  prodstep = 'StrTier0TypesESD'
259  t = '(RAW)'
260  else:
261  prodstep = 'StrTier0TypesRAW'
262  t = '(RAW)'
263  else:
264  if '.recon.' in dsname:
265  prodstep = 'StrTier0TypesRAW'
266  else:
267  prodstep = 'StrTier0TypesESD'
268  if prodstep not in run.stats[s]['StrTier0AMI']:
269  dsnamesplit = dsname.split('.')
270  if len(dsnamesplit)>5:
271  amitag = dsnamesplit[5]
272  run.stats[s]['StrTier0AMI'][prodstep] = amitag
273  else:
274  amitag = ''
275  # check if on CAF
276  oncaf = False
277  if pstates and 'replicate:done' in pstates:
278  oncaf = True
279 
280  # fill the run stats
281  if t not in run.stats[s][prodstep]:
282  run.stats[s][prodstep][t] = oncaf
283 
284 
285  # Done
286  duration = time() - start
287  if len(self.streams)!=0:
288  print (" ==> %i runs found (%.2f sec)" % (len(runlist),duration))
289  else:
290  print (" ==> Done (%g sec)" % duration)
291  return runlist
292 
293  def passes(self, streamevents, key):
294  # streamevents is [('physics_L1Calo', 87274, False), ('physics_RPCwBeam', 1075460, True), ('debug_hlterror', 151, False),...]
295  for streampattern, neventreq, negate, pattern in self.selstreamspatterns:
296  nevents = 0
297  foundmatchingStream = False
298  for se in streamevents:
299  if streampattern.match(se[0]):
300  nevents += se[1]
301  foundmatchingStream = True
302  if neventreq:
303  if neventreq[-1] in '+-':
304  if neventreq[-1] == '-':
305  passreq = nevents < int( neventreq[:-1] )
306  else:
307  passreq = nevents > int( neventreq[:-1] )
308  else:
309  passreq = nevents>int(neventreq)
310  else:
311  if negate:
312  passreq = not foundmatchingStream
313  else:
314  passreq = foundmatchingStream
315  if not passreq:
316  return False
317  return True
318 
Matrix
Definition: Trigger/TrigT1/TrigT1RPChardware/TrigT1RPChardware/Matrix.h:15
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
python.selector.AtlRunQuerySelectorStreams.Set
Set
Definition: AtlRunQuerySelectorStreams.py:6
python.AtlRunQuerySFO.GetSFO_streamsAll
def GetSFO_streamsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:126
python.selector.AtlRunQuerySelectorStreams.StreamSelector.select
def select(self, runlist)
Definition: AtlRunQuerySelectorStreams.py:51
python.selector.AtlRunQuerySelectorStreams.StreamSelector
Definition: AtlRunQuerySelectorStreams.py:17
python.selector.AtlRunQuerySelectorStreams.StreamSelector.passes
def passes(self, streamevents, key)
Definition: AtlRunQuerySelectorStreams.py:293
python.AtlRunQuerySFO.GetSFO_overlapAll
def GetSFO_overlapAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:194
python.utils.AtlRunQueryTimer.timer
def timer(name, disabled=False)
Definition: AtlRunQueryTimer.py:85
python.AtlRunQuerySFO.GetSFO_LBsAll
def GetSFO_LBsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:158
python.selector.AtlRunQuerySelectorStreams.StreamSelector.setShowOutput
def setShowOutput(self)
Definition: AtlRunQuerySelectorStreams.py:39
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
python.selector.AtlRunQuerySelectorStreams.StreamSelector.__init__
def __init__(self, name, streams=[])
Definition: AtlRunQuerySelectorStreams.py:18
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.selector.AtlRunQuerySelectorStreams.StreamSelector.showstreampatterns
showstreampatterns
Definition: AtlRunQuerySelectorStreams.py:26
python.selector.AtlRunQuerySelectorStreams.StreamSelector.selstreamspatterns
selstreamspatterns
Definition: AtlRunQuerySelectorStreams.py:25
python.AtlRunQuerySFO.GetSFO_NeventsAll
def GetSFO_NeventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:175
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
python.selector.AtlRunQuerySelectorStreams.StreamSelector.addShowStreamPattern
def addShowStreamPattern(self, streampattern="*")
Definition: AtlRunQuerySelectorStreams.py:42
python.selector.AtlRunQuerySelectorStreams.StreamSelector.__str__
def __str__(self)
Definition: AtlRunQuerySelectorStreams.py:45
python.AtlRunQueryTier0.GetTier0_datasetsAndTypes
def GetTier0_datasetsAndTypes(cursor, runlist)
Definition: AtlRunQueryTier0.py:41
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.selector.AtlRunQuerySelectorStreams.StreamSelector.streams
streams
Definition: AtlRunQuerySelectorStreams.py:24
python.AtlRunQuerySFO.GetSFO_filesAll
def GetSFO_filesAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:140
python.LArMinBiasAlgConfig.float
float
Definition: LArMinBiasAlgConfig.py:65