ATLAS Offline Software
AtlRunQuerySelectorStreams.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 
4 from __future__ import print_function
5 import re, sys
6 try:
7  Set = set
8 except NameError:
9  from sets import Set
10 from time import time
11 
12 from CoolRunQuery.utils.AtlRunQueryTimer import timer
13 from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
14 from CoolRunQuery.AtlRunQueryRun import Run
15 from CoolRunQuery.selector.AtlRunQuerySelectorBase import Selector, DataKey
16 
17 
18 class StreamSelector(Selector):
19  def __init__(self, name, streams=[]):
20  # streams can be ["*RPC*", "phy* 10k", "deb*,cal* 100k"]
21  # will select runs that satiesfy an 'and' of all patterns in the list
22  # a pattern without number are checked for existence
23  # a pattern with number will be checked against the sum of the events in all matching streams
24 
25  self.streams = ' and '.join(streams)
28 
29  # selstreampatterns is a list of (pattern,nevents) tuples
30  for s in streams:
31  pattern = s.split()[0].replace(',','|').replace('*','.*').replace('%','.*').replace('?','.')
32  negate = pattern[0]=='!'
33  pattern = pattern.lstrip('!')
34  p = re.compile(pattern)
35  nevt = (s.split()+[None])[1]
36  self.selstreamspatterns += [(p,nevt,negate,pattern)]
37 
38  super(StreamSelector,self).__init__(name)
39 
40  def setShowOutput(self):
41  pass
42 
43  def addShowStreamPattern(self, streampattern="*"):
44  self.showstreampatterns += streampattern.replace('*','.*').replace('%','.*').replace('?','.').split(',')
45 
46  def __str__(self):
47  if len(self.streams)!=0:
48  return 'SELOUT Checking if the stream name matches "%s"' % self.streams
49  else:
50  return "Retrieving stream names that match '%s'" % ','.join(self.showstreampatterns)
51 
52  def select(self, runlist):
53 
54  # some preparation: compile the show patterns
55  start = time()
56 
57  if '.*' in self.showstreampatterns:
58  compiledShowPatterns = [re.compile('.*')]
59  else:
60  compiledShowPatterns = [re.compile(p) for p in self.showstreampatterns]
61 
62  # we disable the access to everything that is not needed if we only select
63  ShowStreams = False
64  useTier0 = False
65  if len(compiledShowPatterns)>0:
66  ShowStreams = True
67  useTier0 = False # ATLAS_T0 responsible refusing to allow access to their DB
68 
69 
70  print (self, end='')
71  sys.stdout.flush()
72  newrunlist = []
73  allStreams = Set() # list of all the streams that are in the selected runs
74  connection = coolDbConn.GetSFODBConnection()
75  cursor = connection.cursor()
76  cursor.arraysize = 1000
77 
78  runnrlist = [r.runNr for r in runlist]
79 
80  from CoolRunQuery.AtlRunQuerySFO import GetSFO_streamsAll,GetSFO_filesAll
81  with timer('GetSFO_streamsAll', disabled=True):
82  streamsall = GetSFO_streamsAll( cursor, runnrlist ) # { runnr: [streams] }
83  with timer('GetSFO_filesAll', disabled=True):
84  filesall = GetSFO_filesAll( cursor, runnrlist ) # [(COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS))]
85 
86  if ShowStreams:
87  from CoolRunQuery.AtlRunQuerySFO import GetSFO_LBsAll,GetSFO_NeventsAll,GetSFO_overlapAll
88  with timer('GetSFO_LBsAll', disabled=True):
89  lbinfoall = GetSFO_LBsAll( cursor, runnrlist ) # [(MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), #LUMIBLOCKS)]
90  with timer('GetSFO_overlapAll', disabled=True):
91  overlapall = GetSFO_overlapAll( cursor, runnrlist ) # [(SUM(OVERLAP_EVENTS))]
92  smallrunnrlist=[]
93  for r in runnrlist: # go through old runlist and see
94  if r not in streamsall:
95  continue
96  for s in streamsall[r]:
97  if r in lbinfoall and s in lbinfoall[r] and lbinfoall[r][s][1]>0:
98  smallrunnrlist += [r]
99  break
100  with timer('GetSFO_NeventsAll', disabled=True):
101  neventsall = GetSFO_NeventsAll( cursor, smallrunnrlist ) # [(LUMIBLOCKNR, NREVENTS)]
102 
103 
104 
105  for run in runlist: # go through old runlist and see
106 
107  streams = [] # names of all streams in this run
108  strsize = [] # size of streams in this run
109  strevents = [] # nr of events in this run
110  if run.runNr in streamsall:
111  streams = streamsall[run.runNr]
112  for s in streams:
113  try:
114  _, size, events = filesall[run.runNr][s]
115  except IndexError:
116  _, size, events = (0,0,0)
117  strsize += [size]
118  strevents += [events]
119 
120 
121 
122  if ShowStreams:
123  from CoolRunQuery.utils.AtlRunQueryUtils import Matrix
124 
125  nst = len(streams)
126  stovmat = Matrix(nst,nst) # overlap matrix
127  for i,s in enumerate(streams):
128  run.stats['STR:'+s] = {}
129 
130  # fill overlaps into matrix
131  for j,s2 in enumerate(streams):
132  try:
133  eventsij = overlapall[run.runNr][s][s2]
134  except KeyError:
135  eventsij = 0
136  if i==j and events:
137  eventsij = events
138  stovmat.setitem(i,j,float(eventsij))
139  stovmat.setitem(j,i,float(eventsij)) # symmetrise matrix
140 
141 
142  # read number of events per LB
143  minlb, maxlb, lbs = (0,0,1)
144  try:
145  minlb, maxlb, lbs = lbinfoall[run.runNr][s]
146  except KeyError:
147  pass
148 
149  # if minlb==maxlb==0 -> no file closure at LB boundary
150  if minlb == 0 and maxlb == 0:
151  run.stats['STR:'+s]['LBRecInfo'] = None
152  continue
153  else:
154  lbevcount = '<tr>'
155  result = neventsall[run.runNr][s] #[ (lb,nev),... ]
156  lbold = -1
157  allnev = 0
158  ic = 0
159  ice = 0
160  allic = 0
161  lastElement = False
162  firstcall = True
163 
164  for ice,(lb,nev) in enumerate(result):
165  if ice == len(result):
166  lastElement = True
167  allnev += nev
168  if lb != lbold or lastElement:
169  if lbold != -1:
170  ic += 1
171  allic += 1
172  if allic < 101:
173  if ic == 9:
174  ic = 1
175  lbevcount += '</tr><tr>'
176  lbevcount += '<td style="font-size:75%%">%i&nbsp;(%s)</td>' % (lbold,allnev)
177  else:
178  if firstcall:
179  lbevcount += '</tr><tr>'
180  lbevcount += '<td style="font-size:75%%" colspan="8">... <i>too many LBs (> 100) to show</i></td>'
181  firstcall = False
182  allnev = nev
183  lbold = lb
184  else:
185  allnev += nev
186  lbevcount += '</tr>'
187  run.stats['STR:'+s]['LBRecInfo'] = lbevcount
188  run.stats['STR:'+s]['LBRecInfo'] = result
189 
190  # add overlap information to the run stats
191  for i in range(nst):
192  statkey = 'STR:'+streams[i]
193  run.stats[statkey]['StrOverlap'] = []
194  denom = stovmat.getitem(i,i)
195  if denom==0:
196  continue
197  for j in range(nst):
198  if i == j or stovmat.getitem(i,j) == 0:
199  continue
200  fraction = 100
201  if stovmat.getitem(i,j) != denom:
202  fraction = float(stovmat.getitem(i,j))/float(denom)*100.0
203  run.stats[statkey]['StrOverlap'] += [(streams[j], fraction)]
204 
205 
206 
207  # selection...
208  if not self.passes(zip(streams,strevents),0):
209  continue
210  newrunlist += [run.runNr]
211  allStreams.update(streams)
212  for k,v,s in zip(streams,strevents,strsize):
213  run.addResult('STR:'+k, (v,s))
214 
215  allStreams = ['STR:'+s for s in allStreams]
216 
217 
218  allStreams.sort(key = lambda x: (x[5:]) )
219  allStreams.sort(key = lambda x: (x[4]), reverse=True )
220 
221  # fill the gaps
222  for run in runlist:
223  for s in allStreams:
224  if s not in run.result:
225  run.addResult(s, 'n.a.')
226 
227  runlist = [r for r in runlist if r.runNr in newrunlist]
228 
229  # check if the streams in 'allStreams' match the show patterns
230  for s in allStreams:
231  if any( [p.match(s[4:]) is not None for p in compiledShowPatterns] ):
232  Run.AddToShowOrder(DataKey(s, keytype=DataKey.STREAM))
233 
234 
235 
236 
237  if useTier0:
238 
239  # retrieve Tier-0 information
240  from CoolRunQuery.AtlRunQueryTier0 import GetTier0_datasetsAndTypes
241  tier0connection = coolDbConn.GetTier0DBConnection()
242  cursor = tier0connection.cursor()
243  cursor.arraysize = 1000
244  tier0retdico = GetTier0_datasetsAndTypes( cursor, runnrlist )
245 
246  # add Tier0 information
247  for run in runlist:
248  for s in allStreams:
249  if run.result[s]=='n.a.':
250  continue
251  run.stats[s]['StrTier0TypesRAW'] = {}
252  run.stats[s]['StrTier0TypesESD'] = {}
253  run.stats[s]['StrTier0AMI'] = {}
254  if run.runNr in tier0retdico.keys():
255  for dsname,t,pstates in tier0retdico[run.runNr]:
256  if s.replace('STR:','') in dsname:
257  if '.RAW' in dsname:
258  if '.merge' in dsname:
259  prodstep = 'StrTier0TypesESD'
260  t = '(RAW)'
261  else:
262  prodstep = 'StrTier0TypesRAW'
263  t = '(RAW)'
264  else:
265  if '.recon.' in dsname:
266  prodstep = 'StrTier0TypesRAW'
267  else:
268  prodstep = 'StrTier0TypesESD'
269  if prodstep not in run.stats[s]['StrTier0AMI']:
270  dsnamesplit = dsname.split('.')
271  if len(dsnamesplit)>5:
272  amitag = dsnamesplit[5]
273  run.stats[s]['StrTier0AMI'][prodstep] = amitag
274  else:
275  amitag = ''
276  # check if on CAF
277  oncaf = False
278  if pstates and 'replicate:done' in pstates:
279  oncaf = True
280 
281  # fill the run stats
282  if t not in run.stats[s][prodstep]:
283  run.stats[s][prodstep][t] = oncaf
284 
285 
286  # Done
287  duration = time() - start
288  if len(self.streams)!=0:
289  print (" ==> %i runs found (%.2f sec)" % (len(runlist),duration))
290  else:
291  print (" ==> Done (%g sec)" % duration)
292  return runlist
293 
294  def passes(self, streamevents, key):
295  # streamevents is [('physics_L1Calo', 87274, False), ('physics_RPCwBeam', 1075460, True), ('debug_hlterror', 151, False),...]
296  for streampattern, neventreq, negate, pattern in self.selstreamspatterns:
297  nevents = 0
298  foundmatchingStream = False
299  for se in streamevents:
300  if streampattern.match(se[0]):
301  nevents += se[1]
302  foundmatchingStream = True
303  if neventreq:
304  if neventreq[-1] in '+-':
305  if neventreq[-1] == '-':
306  passreq = nevents < int( neventreq[:-1] )
307  else:
308  passreq = nevents > int( neventreq[:-1] )
309  else:
310  passreq = nevents>int(neventreq)
311  else:
312  if negate:
313  passreq = not foundmatchingStream
314  else:
315  passreq = foundmatchingStream
316  if not passreq:
317  return False
318  return True
319 
Matrix
Definition: Trigger/TrigT1/TrigT1RPChardware/TrigT1RPChardware/Matrix.h:15
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
python.selector.AtlRunQuerySelectorStreams.Set
Set
Definition: AtlRunQuerySelectorStreams.py:7
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.AtlRunQuerySFO.GetSFO_streamsAll
def GetSFO_streamsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:127
python.selector.AtlRunQuerySelectorStreams.StreamSelector.select
def select(self, runlist)
Definition: AtlRunQuerySelectorStreams.py:52
python.selector.AtlRunQuerySelectorStreams.StreamSelector
Definition: AtlRunQuerySelectorStreams.py:18
python.selector.AtlRunQuerySelectorStreams.StreamSelector.passes
def passes(self, streamevents, key)
Definition: AtlRunQuerySelectorStreams.py:294
python.AtlRunQuerySFO.GetSFO_overlapAll
def GetSFO_overlapAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:195
python.utils.AtlRunQueryTimer.timer
def timer(name, disabled=False)
Definition: AtlRunQueryTimer.py:86
python.AtlRunQuerySFO.GetSFO_LBsAll
def GetSFO_LBsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:159
python.selector.AtlRunQuerySelectorStreams.StreamSelector.setShowOutput
def setShowOutput(self)
Definition: AtlRunQuerySelectorStreams.py:40
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.selector.AtlRunQuerySelectorStreams.StreamSelector.__init__
def __init__(self, name, streams=[])
Definition: AtlRunQuerySelectorStreams.py:19
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.selector.AtlRunQuerySelectorStreams.StreamSelector.showstreampatterns
showstreampatterns
Definition: AtlRunQuerySelectorStreams.py:27
python.selector.AtlRunQuerySelectorStreams.StreamSelector.selstreamspatterns
selstreamspatterns
Definition: AtlRunQuerySelectorStreams.py:26
python.AtlRunQuerySFO.GetSFO_NeventsAll
def GetSFO_NeventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:176
python.selector.AtlRunQuerySelectorStreams.StreamSelector.addShowStreamPattern
def addShowStreamPattern(self, streampattern="*")
Definition: AtlRunQuerySelectorStreams.py:43
python.selector.AtlRunQuerySelectorStreams.StreamSelector.__str__
def __str__(self)
Definition: AtlRunQuerySelectorStreams.py:46
python.AtlRunQueryTier0.GetTier0_datasetsAndTypes
def GetTier0_datasetsAndTypes(cursor, runlist)
Definition: AtlRunQueryTier0.py:41
readCCLHist.float
float
Definition: readCCLHist.py:83
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.selector.AtlRunQuerySelectorStreams.StreamSelector.streams
streams
Definition: AtlRunQuerySelectorStreams.py:25
python.AtlRunQuerySFO.GetSFO_filesAll
def GetSFO_filesAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:141