ATLAS Offline Software
AtlRunQuerySFO.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 #
5 # ----------------------------------------------------------------
6 # Script : AtlRunQuerySFO.py
7 # Project: AtlRunQuery
8 # Purpose: Utility to retrieve information from SFO DB
9 # Authors: Andreas Hoecker (CERN), Joerg Stelzer (DESY)
10 # Created: Nov 13, 2008
11 # ----------------------------------------------------------------
12 #
13 # SFO DB schema:
14 # ==============================================
15 #
16 # Run Table
17 #
18 # "SFOID" "VARCHAR2(32 Bytes)"
19 # "RUNNR" "NUMBER(10,0)"
20 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
21 # "STREAM" "VARCHAR2(32 Bytes)"
22 # "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
23 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
24 # "DSNAME" "VARCHAR2(256 Bytes)"
25 # "T_STAMP" "TIMESTAMP(0)" "RUN_SEQNUM" "NUMBER(10,0)"
26 #
27 # Lumiblock Table
28 #
29 # "SFOID" "VARCHAR2(32 Bytes)"
30 # "RUNNR" "NUMBER(10,0)"
31 # "LUMIBLOCKNR" "NUMBER(10,0)"
32 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
33 # "STREAM" "VARCHAR2(32 Bytes)"
34 # "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
35 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
36 # "T_STAMP" "TIMESTAMP(0)" "LUMIBLOCK_SEQNUM" "NUMBER(10,0)"
37 #
38 # File Table
39 #
40 # "LFN" "VARCHAR2(256 Bytes)" filename
41 # "FILENR" "NUMBER(10,0)"
42 # "SFOID" "VARCHAR2(32 Bytes)"
43 # "RUNNR" "NUMBER(10,0)"
44 # "LUMIBLOCKNR" "NUMBER(10,0)"
45 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
46 # "STREAM" "VARCHAR2(32 Bytes)"
47 # "SFOHOST" "VARCHAR2(64 Bytes)"
48 # "TRANSFERSTATE" "VARCHAR2(32 Bytes)" [ONDISK,TRANSFERRED]
49 # "FILESTATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,DELETED]
50 # "FILEHEALTH" "VARCHAR2(32 Bytes)" [SANE,TRUNCATED]
51 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
52 # "FILESIZE" "NUMBER(10,0)"
53 # "GUID" "VARCHAR2(64 Bytes)"
54 # "CHECKSUM" "VARCHAR2(64 Bytes)" Adler-32
55 # "PFN" "VARCHAR2(1024 Bytes)" /path/filename on Castor
56 # "SFOPFN" "VARCHAR2(1024 Bytes)" /path/filename on SFO
57 # "NREVENTS" "NUMBER(10,0)"
58 # "T_STAMP" "TIMESTAMP(0)" "ENDTIME" "TIMESTAMP(0)" UTC time
59 # "FILE_SEQNUM" "NUMBER(10,0)"
60 #
61 # Index Table
62 #
63 # "SEQNAME" "VARCHAR2(64 Bytes)"
64 # "SEQINDEX" "NUMBER(10,0)"
65 # "LASTMOD" "TIMESTAMP(0)"
66 #
67 # Schema of new OVERLAP table:
68 #
69 # "SFOID" VARCHAR2(32 BYTE)
70 # "RUNNR" NUMBER(10,0)
71 # "REFERENCE_STREAM" VARCHAR2(128 BYTE)
72 # "OVERLAP_STREAM" VARCHAR2(128 BYTE)
73 # "OVERLAP_EVENTS" NUMBER(10,0)
74 # "OVERLAP_RATIO" BINARY_FLOAT
75 # "TYPE" VARCHAR2(32 BYTE) [RAW, FILE]
76 # "T_STAMP" TIMESTAMP (0)
77 # "OVERLAP_SEQNUM" NUMBER(10,0)
78 #
79 #
80 # Each row provides the overlap for a stream pair (in a given run and
81 # SFO). The pair is composed by a reference stream and an overlap stream.
82 # We can distinguish between two cases:
83 #
84 # a) reference stream = overlap stream
85 # The row provides the total number of events in the stream
86 # (OVERLAP_EVENTS) and OVERLAP_RATIO is 1
87 #
88 # b) reference stream != overlap stream
89 # The row provides the overlap between the two streams (OVERLAP_EVENTS).
90 # OVERLAP_RATIO is defined as OVERLAP_EVENTS/#(events for reference stream).
91 #
92 # The attached screenshot is an example of the expected result (with two
93 # streams only).
94 #
95 #
96 # The TYPE column is needed to distinguish the source level of the counters:
97 #
98 # RAW: raw streams at the input of the SFO
99 # FILE: resulting streams after the SFO stream processing
100 # (inclusive/exclusive, late, ...)
101 #
102 
103 from CoolRunQuery.utils.AtlRunQueryCache import Cache
104 from CoolRunQuery.AtlRunQueryRun import Run
105 
106 from time import time
107 from collections import defaultdict
108 
109 @Cache("cache/sfo","%s_run_%i")
110 def executeCachedQuery(run,cu,q,cachekey):
111  cu.execute(q, run=run)
112  return cu.fetchall()
113 
114 def executeQuery(run,cu,q,cachekey):
115  cu.execute(q, run=run)
116  return cu.fetchall()
117 
118 
119 
120 
121 
122 # The following 5 methods are used in the stream selector
123 
124 
125 
126 def GetSFO_streamsAll( cursor, runlist ):
127  res = defaultdict(list)
128  q = "SELECT DISTINCT STREAMTYPE, STREAM FROM SFO_TZ_RUN WHERE RUNNR=:run"
129  cursor.prepare(q)
130 
131  for run in runlist:
132  cursor.execute(q,run=run)
133  qr = cursor.fetchall()
134  for e in qr:
135  res[run].append("%s_%s" % (e[0],e[1]))
136 
137  return res
138 
139 
140 def GetSFO_filesAll( cursor, runlist ):
141  """returns nfiles, fsize, nevents"""
142  res = defaultdict(dict)
143  q = "SELECT STREAMTYPE, STREAM, COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR = :run GROUP BY STREAM, STREAMTYPE"
144  cursor.prepare(q)
145 
146  for run in runlist:
147  if run==Run.runnropen:
148  qr = executeQuery(run, cursor, q, cachekey=("filesAll",run))
149  else:
150  qr = executeCachedQuery(run, cursor, q, cachekey=("filesAll",run))
151  for e in qr:
152  key = "%s_%s" % (e[0],e[1])
153  res[run][key] = e[2:5] # #files, filesize, nevents
154  return res
155 
156 
157 
158 def GetSFO_LBsAll( cursor, runlist ):
159  res = defaultdict(dict)
160  q = "SELECT STREAMTYPE, STREAM, MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), COUNT(DISTINCT LUMIBLOCKNR) FROM SFO_TZ_Lumiblock WHERE RUNNR=:run GROUP BY STREAM, STREAMTYPE"
161  cursor.prepare(q)
162 
163  for run in runlist:
164  if run==Run.runnropen:
165  qr = executeQuery(run, cursor, q, cachekey=("LBsAll",run))
166  else:
167  qr = executeCachedQuery(run, cursor, q, cachekey=("LBsAll",run))
168  for e in qr:
169  key2 = "%s_%s" % (e[0],e[1])
170  res[run][key2] = e[2:5] # minlb,maxlb,nlb
171  return res
172 
173 
174 
175 def GetSFO_NeventsAll( cursor, runlist ):
176  # due to the amount of information oracle access is faster than file access
177  res = {}
178  q = "SELECT STREAMTYPE, STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run GROUP BY STREAMTYPE, STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
179  cursor.prepare(q)
180 
181  for run in runlist:
182  if run==Run.runnropen:
183  qr = executeQuery(run, cursor, q, cachekey=("NeventsAll",run))
184  else:
185  qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAll",run))
186  res[run] = defaultdict(list)
187  for e in qr:
188  key2 = "%s_%s" % (e[0],e[1])
189  res[run][key2].append(e[2:4]) # lbNr, NrEv
190 
191  return res
192 
193 
194 def GetSFO_overlapAll( cursor, runlist ):
195  """returns number of overlap events"""
196  res = {}
197  q = "SELECT REFERENCE_STREAM, OVERLAP_STREAM, SUM(OVERLAP_EVENTS) FROM SFO_TZ_Overlap WHERE TYPE='FILE' and RUNNR=:run GROUP BY REFERENCE_STREAM, OVERLAP_STREAM"
198  cursor.prepare(q)
199 
200  for run in runlist:
201  if run==Run.runnropen:
202  qr = executeQuery(run, cursor, q, cachekey=("overlapAll",run))
203  else:
204  qr = executeCachedQuery(run, cursor, q, cachekey=("overlapAll",run))
205 
206  res[run] = defaultdict(dict)
207  for e in qr:
208  res[run][e[0]][e[1]] = e[2] # overlap
209 
210  return res
211 
212 
213 
214 
215 
216 
217 # The following function is used by the XML file maker
218 
219 # we store only information about physics streams
220 
221 
222 
223 
224 def GetSFO_NeventsAllPhysics( cursor, runlist ):
225  res = {}
226  q = "SELECT STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run AND STREAMTYPE='physics' GROUP BY STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
227  cursor.prepare(q)
228 
229  for run in runlist:
230  if run==Run.runnropen:
231  qr = executeQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
232  else:
233  qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
234  res[run] = defaultdict(list)
235  for e in qr:
236  key2 = "physics_%s" % e[0]
237  res[run][key2].append(e[1:3]) # lbNr, NrEv
238 
239  return res
240 
241 
242 
243 
244 
245 # The following function is used by the EventSelector
246 
247 
248 
249 def GetSFO_NphysicseventsAll( cursor, runlist ):
250  res = {}
251  q = "SELECT SUM(OVERLAP_EVENTS) from SFO_TZ_OVERLAP WHERE RUNNR=:run AND TYPE='EVENTCOUNT' AND REFERENCE_STREAM='physics_EventCount'"
252  cursor.prepare(q)
253  for run in runlist:
254  if run==Run.runnropen or 1: # FIXME problem in cache
255  qr = executeQuery(run, cursor, q, cachekey=("SumOverlap",run))
256  else:
257  qr = executeCachedQuery(run, cursor, q, cachekey=("SumOverlap",run))
258  for e in qr:
259  if e[0] is not None:
260  res[run] = e[0]
261  return res
262 
263 
264 
265 
266 def GetSFO_lastNruns( cursor, nruns ):
267  """returns list of last 'nruns' run numbers, where at least one stream is not calibration"""
268  cursor.execute( "SELECT RUNNR,STATE FROM (SELECT UNIQUE RUNNR,STATE FROM SFO_TZ_RUN WHERE STREAMTYPE!='calibration' AND RUNNR < 999999 ORDER BY RUNNR DESC) SFO_TZ_RUN2 WHERE rownum<=:arg_1", arg_1=nruns )
269  return cursor.fetchall()
270 
271 
272 
273 
278 
279 def SetOKSLinks( runlist ):
280  from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
281  conn = coolDbConn.GetAtlasRunDBConnection()
282  cursor = conn.cursor()
283  query = [
284  "select ConfigSchema,ConfigData from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run", # Run 1+2
285  "select ConfigVersion,Release from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run" # Run 3
286  ]
287  for run in runlist:
288  cursor.execute(query[0 if run.lhcRun<3 else 1], run=run.runNr)
289  re = cursor.fetchall()
290  run.addResult('oks',re[0])
291 
292 def main():
293  test_oks = True
294  test_sfo = False
295 
296  if test_oks:
297  #from os import environ as env
298  #from CoolRunQuery.AtlRunQueryUtils import coolDbConn
299  #coolDbConn.get_auth('oracle://atlr/rn_r') # only in /afs/cern.ch/atlas/project/tdaq/databases/.coral/authentication.xml
300  #print (coolDbConn.get_auth('oracle://ATLAS_COOLPROD/ATLAS_COOLOFL_TRIGGER'))
301  from CoolRunQuery.AtlRunQuerySFO import SetOKSLinks
302  SetOKSLinks([Run(178211)]) # Run 2
303  SetOKSLinks([Run(405396)]) # Run 3
304 
305  if test_sfo:
306 
307  runno = 122050
308  import sys
309  if len(sys.argv)>1:
310  runno = int(sys.argv[1])
311 
312  runno = [140541, 140571, 140579, 140592, 140616, 140620,
313  140622, 140638, 140670, 140682, 140704, 140737, 140747,
314  140748, 140754, 140762, 140765, 140769, 140772, 140776,
315  140790, 140794, 140822, 140836, 140842, 140929, 140953,
316  140955, 140974, 140975, 141046, 141059, 141066, 141079,
317  141109, 141150, 141189, 141192, 141194, 141203, 141209,
318  141226, 141234, 141236, 141237, 141238, 141266, 141270,
319  141359, 141374, 141387, 141398, 141401, 141403, 141461,
320  141473, 141474, 141525, 141527, 141529, 141533, 141534,
321  141561, 141562, 141563, 141565, 141599, 141624, 141655,
322  141667, 141670, 141688, 141689, 141691, 141695, 141700,
323  141702, 141704, 141705, 141706, 141707, 141718, 141721,
324  141730, 141746, 141748, 141749, 141755, 141769, 141807,
325  141811, 141818, 141841, 141915, 141928, 141976, 141979,
326  141994, 141998, 141999, 142042, 142065, 142081, 142091,
327  142094, 142111, 142123, 142125, 142128, 142133, 142144,
328  142149, 142154, 142155, 142157, 142159, 142161, 142165,
329  142166, 142171, 142174, 142183, 142185, 142187, 142189,
330  142190, 142191, 142192, 142193, 142194, 142195, 142199,
331  142203, 142205, 142210, 142214, 142216, 142240, 142258,
332  142259, 142265, 142291, 142301, 142308, 142309, 142310,
333  142319, 142356, 142368, 142383, 142390, 142391, 142392,
334  142394, 142395, 142397, 142400, 142401, 142402, 142403,
335  142404, 142405, 142406, 143019, 143023, 143027, 143033,
336  143034, 143131, 143136, 143143, 143163, 143169, 143171,
337  143178, 143182, 143185, 143190, 143192, 143198, 143203,
338  143204, 143205, 143207, 143210, 143218, 143222, 143225,
339  143236, 143242]
340 
341  #runno = range(141000,143000)
342 
343 
344  from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
345  connection = coolDbConn.GetSFODBConnection()
346  cursor = connection.cursor()
347 
348  # find last N runs
349  if False:
350  n = 10
351  runs = GetSFO_lastNruns( cursor, n )
352  print ('Last %i runs:' % n)
353  for r in runs:
354  print ('... %i' % r)
355  sys.exit()
356 
357  # retrieve streams
358  if True:
359  start = time()
360  GetSFO_streamsAll( cursor, runno )
361  print ("streams",time()-start)
362  start = time()
363  GetSFO_LBsAll ( cursor, runno )
364  print ("lbs",time()-start)
365  start = time()
366  GetSFO_NeventsAll( cursor, runno )
367  print ("events",time()-start)
368  start = time()
369  GetSFO_filesAll ( cursor, runno )
370  print ("files",time()-start)
371  start = time()
372  GetSFO_overlapAll( cursor, runno )
373  print ("overlap",time()-start)
374 
375 
376  print ("Query execution time: %f sec" % (time()-start))
377 
378  cursor.close()
379  connection.close()
380 
381 
382 
383 if __name__ == '__main__':
384  main()
python.AtlRunQuerySFO.GetSFO_streamsAll
def GetSFO_streamsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:126
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.AtlRunQuerySFO.GetSFO_overlapAll
def GetSFO_overlapAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:194
python.AtlRunQuerySFO.GetSFO_NphysicseventsAll
def GetSFO_NphysicseventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:249
python.AtlRunQuerySFO.GetSFO_LBsAll
def GetSFO_LBsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:158
python.AtlRunQuerySFO.executeCachedQuery
def executeCachedQuery(run, cu, q, cachekey)
Definition: AtlRunQuerySFO.py:110
python.AtlRunQuerySFO.SetOKSLinks
def SetOKSLinks(runlist)
The following function is used to get OKS info.
Definition: AtlRunQuerySFO.py:279
python.AtlRunQuerySFO.main
def main()
Definition: AtlRunQuerySFO.py:292
python.AtlRunQuerySFO.GetSFO_lastNruns
def GetSFO_lastNruns(cursor, nruns)
Definition: AtlRunQuerySFO.py:266
python.AtlRunQuerySFO.GetSFO_NeventsAll
def GetSFO_NeventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:175
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
python.AtlRunQuerySFO.GetSFO_NeventsAllPhysics
def GetSFO_NeventsAllPhysics(cursor, runlist)
Definition: AtlRunQuerySFO.py:224
python.AtlRunQuerySFO.executeQuery
def executeQuery(run, cu, q, cachekey)
Definition: AtlRunQuerySFO.py:114
CaloLCWConfig.Run
Run
Definition: CaloLCWConfig.py:39
python.AtlRunQuerySFO.GetSFO_filesAll
def GetSFO_filesAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:140