ATLAS Offline Software
Loading...
Searching...
No Matches
AtlRunQuerySelectorStreams.py
Go to the documentation of this file.
1# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2
3
4import re, sys
5try:
6 Set = set
7except NameError:
8 from sets import Set
9from time import time
10
11from CoolRunQuery.utils.AtlRunQueryTimer import timer
12from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
13from CoolRunQuery.AtlRunQueryRun import Run
14from CoolRunQuery.selector.AtlRunQuerySelectorBase import Selector, DataKey
15
16
17class StreamSelector(Selector):
18 def __init__(self, name, streams=[]):
19 # streams can be ["*RPC*", "phy* 10k", "deb*,cal* 100k"]
20 # will select runs that satiesfy an 'and' of all patterns in the list
21 # a pattern without number are checked for existence
22 # a pattern with number will be checked against the sum of the events in all matching streams
23
24 self.streams = ' and '.join(streams)
27
28 # selstreampatterns is a list of (pattern,nevents) tuples
29 for s in streams:
30 pattern = s.split()[0].replace(',','|').replace('*','.*').replace('%','.*').replace('?','.')
31 negate = pattern[0]=='!'
32 pattern = pattern.lstrip('!')
33 p = re.compile(pattern)
34 nevt = (s.split()+[None])[1]
35 self.selstreamspatterns += [(p,nevt,negate,pattern)]
36
37 super(StreamSelector,self).__init__(name)
38
39 def setShowOutput(self):
40 pass
41
42 def addShowStreamPattern(self, streampattern="*"):
43 self.showstreampatterns += streampattern.replace('*','.*').replace('%','.*').replace('?','.').split(',')
44
45 def __str__(self):
46 if len(self.streams)!=0:
47 return 'SELOUT Checking if the stream name matches "%s"' % self.streams
48 else:
49 return "Retrieving stream names that match '%s'" % ','.join(self.showstreampatterns)
50
51 def select(self, runlist):
52
53 # some preparation: compile the show patterns
54 start = time()
55
56 if '.*' in self.showstreampatterns:
57 compiledShowPatterns = [re.compile('.*')]
58 else:
59 compiledShowPatterns = [re.compile(p) for p in self.showstreampatterns]
60
61 # we disable the access to everything that is not needed if we only select
62 ShowStreams = False
63 useTier0 = False
64 if len(compiledShowPatterns)>0:
65 ShowStreams = True
66 useTier0 = False # ATLAS_T0 responsible refusing to allow access to their DB
67
68
69 print (self, end='')
70 sys.stdout.flush()
71 newrunlist = []
72 allStreams = Set() # list of all the streams that are in the selected runs
73 connection = coolDbConn.GetSFODBConnection()
74 cursor = connection.cursor()
75 cursor.arraysize = 1000
76
77 runnrlist = [r.runNr for r in runlist]
78
79 from CoolRunQuery.AtlRunQuerySFO import GetSFO_streamsAll,GetSFO_filesAll
80 with timer('GetSFO_streamsAll', disabled=True):
81 streamsall = GetSFO_streamsAll( cursor, runnrlist ) # { runnr: [streams] }
82 with timer('GetSFO_filesAll', disabled=True):
83 filesall = GetSFO_filesAll( cursor, runnrlist ) # [(COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS))]
84
85 if ShowStreams:
86 from CoolRunQuery.AtlRunQuerySFO import GetSFO_LBsAll,GetSFO_NeventsAll,GetSFO_overlapAll
87 with timer('GetSFO_LBsAll', disabled=True):
88 lbinfoall = GetSFO_LBsAll( cursor, runnrlist ) # [(MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), #LUMIBLOCKS)]
89 with timer('GetSFO_overlapAll', disabled=True):
90 overlapall = GetSFO_overlapAll( cursor, runnrlist ) # [(SUM(OVERLAP_EVENTS))]
91 smallrunnrlist=[]
92 for r in runnrlist: # go through old runlist and see
93 if r not in streamsall:
94 continue
95 for s in streamsall[r]:
96 if r in lbinfoall and s in lbinfoall[r] and lbinfoall[r][s][1]>0:
97 smallrunnrlist += [r]
98 break
99 with timer('GetSFO_NeventsAll', disabled=True):
100 neventsall = GetSFO_NeventsAll( cursor, smallrunnrlist ) # [(LUMIBLOCKNR, NREVENTS)]
101
102
103
104 for run in runlist: # go through old runlist and see
105
106 streams = [] # names of all streams in this run
107 strsize = [] # size of streams in this run
108 strevents = [] # nr of events in this run
109 if run.runNr in streamsall:
110 streams = streamsall[run.runNr]
111 for s in streams:
112 try:
113 _, size, events = filesall[run.runNr][s]
114 except IndexError:
115 _, size, events = (0,0,0)
116 strsize += [size]
117 strevents += [events]
118
119
120
121 if ShowStreams:
122 from CoolRunQuery.utils.AtlRunQueryUtils import Matrix
123
124 nst = len(streams)
125 stovmat = Matrix(nst,nst) # overlap matrix
126 for i,s in enumerate(streams):
127 run.stats['STR:'+s] = {}
128
129 # fill overlaps into matrix
130 for j,s2 in enumerate(streams):
131 try:
132 eventsij = overlapall[run.runNr][s][s2]
133 except KeyError:
134 eventsij = 0
135 if i==j and events:
136 eventsij = events
137 stovmat.setitem(i,j,float(eventsij))
138 stovmat.setitem(j,i,float(eventsij)) # symmetrise matrix
139
140
141 # read number of events per LB
142 minlb, maxlb, lbs = (0,0,1)
143 try:
144 minlb, maxlb, lbs = lbinfoall[run.runNr][s]
145 except KeyError:
146 pass
147
148 # if minlb==maxlb==0 -> no file closure at LB boundary
149 if minlb == 0 and maxlb == 0:
150 run.stats['STR:'+s]['LBRecInfo'] = None
151 continue
152 else:
153 lbevcount = '<tr>'
154 result = neventsall[run.runNr][s] #[ (lb,nev),... ]
155 lbold = -1
156 allnev = 0
157 ic = 0
158 ice = 0
159 allic = 0
160 lastElement = False
161 firstcall = True
162
163 for ice,(lb,nev) in enumerate(result):
164 if ice == len(result):
165 lastElement = True
166 allnev += nev
167 if lb != lbold or lastElement:
168 if lbold != -1:
169 ic += 1
170 allic += 1
171 if allic < 101:
172 if ic == 9:
173 ic = 1
174 lbevcount += '</tr><tr>'
175 lbevcount += '<td style="font-size:75%%">%i&nbsp;(%s)</td>' % (lbold,allnev)
176 else:
177 if firstcall:
178 lbevcount += '</tr><tr>'
179 lbevcount += '<td style="font-size:75%%" colspan="8">... <i>too many LBs (> 100) to show</i></td>'
180 firstcall = False
181 allnev = nev
182 lbold = lb
183 else:
184 allnev += nev
185 lbevcount += '</tr>'
186 run.stats['STR:'+s]['LBRecInfo'] = lbevcount
187 run.stats['STR:'+s]['LBRecInfo'] = result
188
189 # add overlap information to the run stats
190 for i in range(nst):
191 statkey = 'STR:'+streams[i]
192 run.stats[statkey]['StrOverlap'] = []
193 denom = stovmat.getitem(i,i)
194 if denom==0:
195 continue
196 for j in range(nst):
197 if i == j or stovmat.getitem(i,j) == 0:
198 continue
199 fraction = 100
200 if stovmat.getitem(i,j) != denom:
201 fraction = float(stovmat.getitem(i,j))/float(denom)*100.0
202 run.stats[statkey]['StrOverlap'] += [(streams[j], fraction)]
203
204
205
206 # selection...
207 if not self.passes(zip(streams,strevents),0):
208 continue
209 newrunlist += [run.runNr]
210 allStreams.update(streams)
211 for k,v,s in zip(streams,strevents,strsize):
212 run.addResult('STR:'+k, (v,s))
213
214 allStreams = ['STR:'+s for s in allStreams]
215
216
217 allStreams.sort(key = lambda x: (x[5:]) )
218 allStreams.sort(key = lambda x: (x[4]), reverse=True )
219
220 # fill the gaps
221 for run in runlist:
222 for s in allStreams:
223 if s not in run.result:
224 run.addResult(s, 'n.a.')
225
226 runlist = [r for r in runlist if r.runNr in newrunlist]
227
228 # check if the streams in 'allStreams' match the show patterns
229 for s in allStreams:
230 if any( [p.match(s[4:]) is not None for p in compiledShowPatterns] ):
231 Run.AddToShowOrder(DataKey(s, keytype=DataKey.STREAM))
232
233
234
235
236 if useTier0:
237
238 # retrieve Tier-0 information
239 from CoolRunQuery.AtlRunQueryTier0 import GetTier0_datasetsAndTypes
240 tier0connection = coolDbConn.GetTier0DBConnection()
241 cursor = tier0connection.cursor()
242 cursor.arraysize = 1000
243 tier0retdico = GetTier0_datasetsAndTypes( cursor, runnrlist )
244
245 # add Tier0 information
246 for run in runlist:
247 for s in allStreams:
248 if run.result[s]=='n.a.':
249 continue
250 run.stats[s]['StrTier0TypesRAW'] = {}
251 run.stats[s]['StrTier0TypesESD'] = {}
252 run.stats[s]['StrTier0AMI'] = {}
253 if run.runNr in tier0retdico.keys():
254 for dsname,t,pstates in tier0retdico[run.runNr]:
255 if s.replace('STR:','') in dsname:
256 if '.RAW' in dsname:
257 if '.merge' in dsname:
258 prodstep = 'StrTier0TypesESD'
259 t = '(RAW)'
260 else:
261 prodstep = 'StrTier0TypesRAW'
262 t = '(RAW)'
263 else:
264 if '.recon.' in dsname:
265 prodstep = 'StrTier0TypesRAW'
266 else:
267 prodstep = 'StrTier0TypesESD'
268 if prodstep not in run.stats[s]['StrTier0AMI']:
269 dsnamesplit = dsname.split('.')
270 if len(dsnamesplit)>5:
271 amitag = dsnamesplit[5]
272 run.stats[s]['StrTier0AMI'][prodstep] = amitag
273 else:
274 amitag = ''
275 # check if on CAF
276 oncaf = False
277 if pstates and 'replicate:done' in pstates:
278 oncaf = True
279
280 # fill the run stats
281 if t not in run.stats[s][prodstep]:
282 run.stats[s][prodstep][t] = oncaf
283
284
285 # Done
286 duration = time() - start
287 if len(self.streams)!=0:
288 print (" ==> %i runs found (%.2f sec)" % (len(runlist),duration))
289 else:
290 print (" ==> Done (%g sec)" % duration)
291 return runlist
292
293 def passes(self, streamevents, key):
294 # streamevents is [('physics_L1Calo', 87274, False), ('physics_RPCwBeam', 1075460, True), ('debug_hlterror', 151, False),...]
295 for streampattern, neventreq, negate, pattern in self.selstreamspatterns:
296 nevents = 0
297 foundmatchingStream = False
298 for se in streamevents:
299 if streampattern.match(se[0]):
300 nevents += se[1]
301 foundmatchingStream = True
302 if neventreq:
303 if neventreq[-1] in '+-':
304 if neventreq[-1] == '-':
305 passreq = nevents < int( neventreq[:-1] )
306 else:
307 passreq = nevents > int( neventreq[:-1] )
308 else:
309 passreq = nevents>int(neventreq)
310 else:
311 if negate:
312 passreq = not foundmatchingStream
313 else:
314 passreq = foundmatchingStream
315 if not passreq:
316 return False
317 return True
318
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177