ATLAS Offline Software
AtlRunQuerySFO.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 #
5 # ----------------------------------------------------------------
6 # Script : AtlRunQuerySFO.py
7 # Project: AtlRunQuery
8 # Purpose: Utility to retrieve information from SFO DB
9 # Authors: Andreas Hoecker (CERN), Joerg Stelzer (DESY)
10 # Created: Nov 13, 2008
11 # ----------------------------------------------------------------
12 #
13 # SFO DB schema:
14 # ==============================================
15 #
16 # Run Table
17 #
18 # "SFOID" "VARCHAR2(32 Bytes)"
19 # "RUNNR" "NUMBER(10,0)"
20 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
21 # "STREAM" "VARCHAR2(32 Bytes)"
22 # "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
23 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
24 # "DSNAME" "VARCHAR2(256 Bytes)"
25 # "T_STAMP" "TIMESTAMP(0)" "RUN_SEQNUM" "NUMBER(10,0)"
26 #
27 # Lumiblock Table
28 #
29 # "SFOID" "VARCHAR2(32 Bytes)"
30 # "RUNNR" "NUMBER(10,0)"
31 # "LUMIBLOCKNR" "NUMBER(10,0)"
32 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
33 # "STREAM" "VARCHAR2(32 Bytes)"
34 # "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
35 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
36 # "T_STAMP" "TIMESTAMP(0)" "LUMIBLOCK_SEQNUM" "NUMBER(10,0)"
37 #
38 # File Table
39 #
40 # "LFN" "VARCHAR2(256 Bytes)" filename
41 # "FILENR" "NUMBER(10,0)"
42 # "SFOID" "VARCHAR2(32 Bytes)"
43 # "RUNNR" "NUMBER(10,0)"
44 # "LUMIBLOCKNR" "NUMBER(10,0)"
45 # "STREAMTYPE" "VARCHAR2(32 Bytes)"
46 # "STREAM" "VARCHAR2(32 Bytes)"
47 # "SFOHOST" "VARCHAR2(64 Bytes)"
48 # "TRANSFERSTATE" "VARCHAR2(32 Bytes)" [ONDISK,TRANSFERRED]
49 # "FILESTATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,DELETED]
50 # "FILEHEALTH" "VARCHAR2(32 Bytes)" [SANE,TRUNCATED]
51 # "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
52 # "FILESIZE" "NUMBER(10,0)"
53 # "GUID" "VARCHAR2(64 Bytes)"
54 # "CHECKSUM" "VARCHAR2(64 Bytes)" Adler-32
55 # "PFN" "VARCHAR2(1024 Bytes)" /path/filename on Castor
56 # "SFOPFN" "VARCHAR2(1024 Bytes)" /path/filename on SFO
57 # "NREVENTS" "NUMBER(10,0)"
58 # "T_STAMP" "TIMESTAMP(0)" "ENDTIME" "TIMESTAMP(0)" UTC time
59 # "FILE_SEQNUM" "NUMBER(10,0)"
60 #
61 # Index Table
62 #
63 # "SEQNAME" "VARCHAR2(64 Bytes)"
64 # "SEQINDEX" "NUMBER(10,0)"
65 # "LASTMOD" "TIMESTAMP(0)"
66 #
67 # Schema of new OVERLAP table:
68 #
69 # "SFOID" VARCHAR2(32 BYTE)
70 # "RUNNR" NUMBER(10,0)
71 # "REFERENCE_STREAM" VARCHAR2(128 BYTE)
72 # "OVERLAP_STREAM" VARCHAR2(128 BYTE)
73 # "OVERLAP_EVENTS" NUMBER(10,0)
74 # "OVERLAP_RATIO" BINARY_FLOAT
75 # "TYPE" VARCHAR2(32 BYTE) [RAW, FILE]
76 # "T_STAMP" TIMESTAMP (0)
77 # "OVERLAP_SEQNUM" NUMBER(10,0)
78 #
79 #
80 # Each row provides the overlap for a stream pair (in a given run and
81 # SFO). The pair is composed by a reference stream and an overlap stream.
82 # We can distinguish between two cases:
83 #
84 # a) reference stream = overlap stream
85 # The row provides the total number of events in the stream
86 # (OVERLAP_EVENTS) and OVERLAP_RATIO is 1
87 #
88 # b) reference stream != overlap stream
89 # The row provides the overlap between the two streams (OVERLAP_EVENTS).
90 # OVERLAP_RATIO is defined as OVERLAP_EVENTS/#(events for reference stream).
91 #
92 # The attached screenshot is an example of the expected result (with two
93 # streams only).
94 #
95 #
96 # The TYPE column is needed to distinguish the source level of the counters:
97 #
98 # RAW: raw streams at the input of the SFO
99 # FILE: resulting streams after the SFO stream processing
100 # (inclusive/exclusive, late, ...)
101 #
102 
103 from __future__ import with_statement, print_function
104 from CoolRunQuery.utils.AtlRunQueryCache import Cache
105 from CoolRunQuery.AtlRunQueryRun import Run
106 
107 from time import time
108 from collections import defaultdict
109 
110 @Cache("cache/sfo","%s_run_%i")
111 def executeCachedQuery(run,cu,q,cachekey):
112  cu.execute(q, run=run)
113  return cu.fetchall()
114 
115 def executeQuery(run,cu,q,cachekey):
116  cu.execute(q, run=run)
117  return cu.fetchall()
118 
119 
120 
121 
122 
123 # The following 5 methods are used in the stream selector
124 
125 
126 
127 def GetSFO_streamsAll( cursor, runlist ):
128  res = defaultdict(list)
129  q = "SELECT DISTINCT STREAMTYPE, STREAM FROM SFO_TZ_RUN WHERE RUNNR=:run"
130  cursor.prepare(q)
131 
132  for run in runlist:
133  cursor.execute(q,run=run)
134  qr = cursor.fetchall()
135  for e in qr:
136  res[run].append("%s_%s" % (e[0],e[1]))
137 
138  return res
139 
140 
141 def GetSFO_filesAll( cursor, runlist ):
142  """returns nfiles, fsize, nevents"""
143  res = defaultdict(dict)
144  q = "SELECT STREAMTYPE, STREAM, COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR = :run GROUP BY STREAM, STREAMTYPE"
145  cursor.prepare(q)
146 
147  for run in runlist:
148  if run==Run.runnropen:
149  qr = executeQuery(run, cursor, q, cachekey=("filesAll",run))
150  else:
151  qr = executeCachedQuery(run, cursor, q, cachekey=("filesAll",run))
152  for e in qr:
153  key = "%s_%s" % (e[0],e[1])
154  res[run][key] = e[2:5] # #files, filesize, nevents
155  return res
156 
157 
158 
159 def GetSFO_LBsAll( cursor, runlist ):
160  res = defaultdict(dict)
161  q = "SELECT STREAMTYPE, STREAM, MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), COUNT(DISTINCT LUMIBLOCKNR) FROM SFO_TZ_Lumiblock WHERE RUNNR=:run GROUP BY STREAM, STREAMTYPE"
162  cursor.prepare(q)
163 
164  for run in runlist:
165  if run==Run.runnropen:
166  qr = executeQuery(run, cursor, q, cachekey=("LBsAll",run))
167  else:
168  qr = executeCachedQuery(run, cursor, q, cachekey=("LBsAll",run))
169  for e in qr:
170  key2 = "%s_%s" % (e[0],e[1])
171  res[run][key2] = e[2:5] # minlb,maxlb,nlb
172  return res
173 
174 
175 
176 def GetSFO_NeventsAll( cursor, runlist ):
177  # due to the amount of information oracle access is faster than file access
178  res = {}
179  q = "SELECT STREAMTYPE, STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run GROUP BY STREAMTYPE, STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
180  cursor.prepare(q)
181 
182  for run in runlist:
183  if run==Run.runnropen:
184  qr = executeQuery(run, cursor, q, cachekey=("NeventsAll",run))
185  else:
186  qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAll",run))
187  res[run] = defaultdict(list)
188  for e in qr:
189  key2 = "%s_%s" % (e[0],e[1])
190  res[run][key2].append(e[2:4]) # lbNr, NrEv
191 
192  return res
193 
194 
195 def GetSFO_overlapAll( cursor, runlist ):
196  """returns number of overlap events"""
197  res = {}
198  q = "SELECT REFERENCE_STREAM, OVERLAP_STREAM, SUM(OVERLAP_EVENTS) FROM SFO_TZ_Overlap WHERE TYPE='FILE' and RUNNR=:run GROUP BY REFERENCE_STREAM, OVERLAP_STREAM"
199  cursor.prepare(q)
200 
201  for run in runlist:
202  if run==Run.runnropen:
203  qr = executeQuery(run, cursor, q, cachekey=("overlapAll",run))
204  else:
205  qr = executeCachedQuery(run, cursor, q, cachekey=("overlapAll",run))
206 
207  res[run] = defaultdict(dict)
208  for e in qr:
209  res[run][e[0]][e[1]] = e[2] # overlap
210 
211  return res
212 
213 
214 
215 
216 
217 
218 # The following function is used by the XML file maker
219 
220 # we store only information about physics streams
221 
222 
223 
224 
225 def GetSFO_NeventsAllPhysics( cursor, runlist ):
226  res = {}
227  q = "SELECT STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run AND STREAMTYPE='physics' GROUP BY STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
228  cursor.prepare(q)
229 
230  for run in runlist:
231  if run==Run.runnropen:
232  qr = executeQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
233  else:
234  qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
235  res[run] = defaultdict(list)
236  for e in qr:
237  key2 = "physics_%s" % e[0]
238  res[run][key2].append(e[1:3]) # lbNr, NrEv
239 
240  return res
241 
242 
243 
244 
245 
246 # The following function is used by the EventSelector
247 
248 
249 
250 def GetSFO_NphysicseventsAll( cursor, runlist ):
251  res = {}
252  q = "SELECT SUM(OVERLAP_EVENTS) from SFO_TZ_OVERLAP WHERE RUNNR=:run AND TYPE='EVENTCOUNT' AND REFERENCE_STREAM='physics_EventCount'"
253  cursor.prepare(q)
254  for run in runlist:
255  if run==Run.runnropen or 1: # FIXME problem in cache
256  qr = executeQuery(run, cursor, q, cachekey=("SumOverlap",run))
257  else:
258  qr = executeCachedQuery(run, cursor, q, cachekey=("SumOverlap",run))
259  for e in qr:
260  if e[0] is not None:
261  res[run] = e[0]
262  return res
263 
264 
265 
266 
267 def GetSFO_lastNruns( cursor, nruns ):
268  """returns list of last 'nruns' run numbers, where at least one stream is not calibration"""
269  cursor.execute( "SELECT RUNNR,STATE FROM (SELECT UNIQUE RUNNR,STATE FROM SFO_TZ_RUN WHERE STREAMTYPE!='calibration' AND RUNNR < 999999 ORDER BY RUNNR DESC) SFO_TZ_RUN2 WHERE rownum<=:arg_1", arg_1=nruns )
270  return cursor.fetchall()
271 
272 
273 
274 
279 
280 def SetOKSLinks( runlist ):
281  from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
282  conn = coolDbConn.GetAtlasRunDBConnection()
283  cursor = conn.cursor()
284  query = [
285  "select ConfigSchema,ConfigData from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run", # Run 1+2
286  "select ConfigVersion,Release from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run" # Run 3
287  ]
288  for run in runlist:
289  cursor.execute(query[0 if run.lhcRun<3 else 1], run=run.runNr)
290  re = cursor.fetchall()
291  run.addResult('oks',re[0])
292 
293 def main():
294  test_oks = True
295  test_sfo = False
296 
297  if test_oks:
298  #from os import environ as env
299  #from CoolRunQuery.AtlRunQueryUtils import coolDbConn
300  #coolDbConn.get_auth('oracle://atlr/rn_r') # only in /afs/cern.ch/atlas/project/tdaq/databases/.coral/authentication.xml
301  #print (coolDbConn.get_auth('oracle://ATLAS_COOLPROD/ATLAS_COOLOFL_TRIGGER'))
302  from CoolRunQuery.AtlRunQuerySFO import SetOKSLinks
303  SetOKSLinks([Run(178211)]) # Run 2
304  SetOKSLinks([Run(405396)]) # Run 3
305 
306  if test_sfo:
307 
308  runno = 122050
309  import sys
310  if len(sys.argv)>1:
311  runno = int(sys.argv[1])
312 
313  runno = [140541, 140571, 140579, 140592, 140616, 140620,
314  140622, 140638, 140670, 140682, 140704, 140737, 140747,
315  140748, 140754, 140762, 140765, 140769, 140772, 140776,
316  140790, 140794, 140822, 140836, 140842, 140929, 140953,
317  140955, 140974, 140975, 141046, 141059, 141066, 141079,
318  141109, 141150, 141189, 141192, 141194, 141203, 141209,
319  141226, 141234, 141236, 141237, 141238, 141266, 141270,
320  141359, 141374, 141387, 141398, 141401, 141403, 141461,
321  141473, 141474, 141525, 141527, 141529, 141533, 141534,
322  141561, 141562, 141563, 141565, 141599, 141624, 141655,
323  141667, 141670, 141688, 141689, 141691, 141695, 141700,
324  141702, 141704, 141705, 141706, 141707, 141718, 141721,
325  141730, 141746, 141748, 141749, 141755, 141769, 141807,
326  141811, 141818, 141841, 141915, 141928, 141976, 141979,
327  141994, 141998, 141999, 142042, 142065, 142081, 142091,
328  142094, 142111, 142123, 142125, 142128, 142133, 142144,
329  142149, 142154, 142155, 142157, 142159, 142161, 142165,
330  142166, 142171, 142174, 142183, 142185, 142187, 142189,
331  142190, 142191, 142192, 142193, 142194, 142195, 142199,
332  142203, 142205, 142210, 142214, 142216, 142240, 142258,
333  142259, 142265, 142291, 142301, 142308, 142309, 142310,
334  142319, 142356, 142368, 142383, 142390, 142391, 142392,
335  142394, 142395, 142397, 142400, 142401, 142402, 142403,
336  142404, 142405, 142406, 143019, 143023, 143027, 143033,
337  143034, 143131, 143136, 143143, 143163, 143169, 143171,
338  143178, 143182, 143185, 143190, 143192, 143198, 143203,
339  143204, 143205, 143207, 143210, 143218, 143222, 143225,
340  143236, 143242]
341 
342  #runno = range(141000,143000)
343 
344 
345  from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
346  connection = coolDbConn.GetSFODBConnection()
347  cursor = connection.cursor()
348 
349  # find last N runs
350  if False:
351  n = 10
352  runs = GetSFO_lastNruns( cursor, n )
353  print ('Last %i runs:' % n)
354  for r in runs:
355  print ('... %i' % r)
356  sys.exit()
357 
358  # retrieve streams
359  if True:
360  start = time()
361  GetSFO_streamsAll( cursor, runno )
362  print ("streams",time()-start)
363  start = time()
364  GetSFO_LBsAll ( cursor, runno )
365  print ("lbs",time()-start)
366  start = time()
367  GetSFO_NeventsAll( cursor, runno )
368  print ("events",time()-start)
369  start = time()
370  GetSFO_filesAll ( cursor, runno )
371  print ("files",time()-start)
372  start = time()
373  GetSFO_overlapAll( cursor, runno )
374  print ("overlap",time()-start)
375 
376 
377  print ("Query execution time: %f sec" % (time()-start))
378 
379  cursor.close()
380  connection.close()
381 
382 
383 
384 if __name__ == '__main__':
385  main()
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.AtlRunQuerySFO.GetSFO_streamsAll
def GetSFO_streamsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:127
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.AtlRunQuerySFO.GetSFO_overlapAll
def GetSFO_overlapAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:195
python.AtlRunQuerySFO.GetSFO_NphysicseventsAll
def GetSFO_NphysicseventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:250
python.AtlRunQuerySFO.GetSFO_LBsAll
def GetSFO_LBsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:159
python.AtlRunQuerySFO.executeCachedQuery
def executeCachedQuery(run, cu, q, cachekey)
Definition: AtlRunQuerySFO.py:111
python.AtlRunQuerySFO.SetOKSLinks
def SetOKSLinks(runlist)
The following function is used to get OKS info.
Definition: AtlRunQuerySFO.py:280
python.AtlRunQuerySFO.main
def main()
Definition: AtlRunQuerySFO.py:293
python.AtlRunQuerySFO.GetSFO_lastNruns
def GetSFO_lastNruns(cursor, nruns)
Definition: AtlRunQuerySFO.py:267
python.AtlRunQuerySFO.GetSFO_NeventsAll
def GetSFO_NeventsAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:176
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
python.AtlRunQuerySFO.GetSFO_NeventsAllPhysics
def GetSFO_NeventsAllPhysics(cursor, runlist)
Definition: AtlRunQuerySFO.py:225
python.AtlRunQuerySFO.executeQuery
def executeQuery(run, cu, q, cachekey)
Definition: AtlRunQuerySFO.py:115
CaloLCWConfig.Run
Run
Definition: CaloLCWConfig.py:39
python.AtlRunQuerySFO.GetSFO_filesAll
def GetSFO_filesAll(cursor, runlist)
Definition: AtlRunQuerySFO.py:141