ATLAS Offline Software
Loading...
Searching...
No Matches
AtlRunQuerySFO.py
Go to the documentation of this file.
1#!/bin/env python
2
3# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4#
5# ----------------------------------------------------------------
6# Script : AtlRunQuerySFO.py
7# Project: AtlRunQuery
8# Purpose: Utility to retrieve information from SFO DB
9# Authors: Andreas Hoecker (CERN), Joerg Stelzer (DESY)
10# Created: Nov 13, 2008
11# ----------------------------------------------------------------
12#
13# SFO DB schema:
14# ==============================================
15#
16# Run Table
17#
18# "SFOID" "VARCHAR2(32 Bytes)"
19# "RUNNR" "NUMBER(10,0)"
20# "STREAMTYPE" "VARCHAR2(32 Bytes)"
21# "STREAM" "VARCHAR2(32 Bytes)"
22# "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
23# "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
24# "DSNAME" "VARCHAR2(256 Bytes)"
25# "T_STAMP" "TIMESTAMP(0)" "RUN_SEQNUM" "NUMBER(10,0)"
26#
27# Lumiblock Table
28#
29# "SFOID" "VARCHAR2(32 Bytes)"
30# "RUNNR" "NUMBER(10,0)"
31# "LUMIBLOCKNR" "NUMBER(10,0)"
32# "STREAMTYPE" "VARCHAR2(32 Bytes)"
33# "STREAM" "VARCHAR2(32 Bytes)"
34# "STATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,TRANSFERRED]
35# "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
36# "T_STAMP" "TIMESTAMP(0)" "LUMIBLOCK_SEQNUM" "NUMBER(10,0)"
37#
38# File Table
39#
40# "LFN" "VARCHAR2(256 Bytes)" filename
41# "FILENR" "NUMBER(10,0)"
42# "SFOID" "VARCHAR2(32 Bytes)"
43# "RUNNR" "NUMBER(10,0)"
44# "LUMIBLOCKNR" "NUMBER(10,0)"
45# "STREAMTYPE" "VARCHAR2(32 Bytes)"
46# "STREAM" "VARCHAR2(32 Bytes)"
47# "SFOHOST" "VARCHAR2(64 Bytes)"
48# "TRANSFERSTATE" "VARCHAR2(32 Bytes)" [ONDISK,TRANSFERRED]
49# "FILESTATE" "VARCHAR2(32 Bytes)" [OPENED,CLOSED,DELETED]
50# "FILEHEALTH" "VARCHAR2(32 Bytes)" [SANE,TRUNCATED]
51# "TZSTATE" "VARCHAR2(32 Bytes)" filled by TZ
52# "FILESIZE" "NUMBER(10,0)"
53# "GUID" "VARCHAR2(64 Bytes)"
54# "CHECKSUM" "VARCHAR2(64 Bytes)" Adler-32
55# "PFN" "VARCHAR2(1024 Bytes)" /path/filename on Castor
56# "SFOPFN" "VARCHAR2(1024 Bytes)" /path/filename on SFO
57# "NREVENTS" "NUMBER(10,0)"
58# "T_STAMP" "TIMESTAMP(0)" "ENDTIME" "TIMESTAMP(0)" UTC time
59# "FILE_SEQNUM" "NUMBER(10,0)"
60#
61# Index Table
62#
63# "SEQNAME" "VARCHAR2(64 Bytes)"
64# "SEQINDEX" "NUMBER(10,0)"
65# "LASTMOD" "TIMESTAMP(0)"
66#
67# Schema of new OVERLAP table:
68#
69# "SFOID" VARCHAR2(32 BYTE)
70# "RUNNR" NUMBER(10,0)
71# "REFERENCE_STREAM" VARCHAR2(128 BYTE)
72# "OVERLAP_STREAM" VARCHAR2(128 BYTE)
73# "OVERLAP_EVENTS" NUMBER(10,0)
74# "OVERLAP_RATIO" BINARY_FLOAT
75# "TYPE" VARCHAR2(32 BYTE) [RAW, FILE]
76# "T_STAMP" TIMESTAMP (0)
77# "OVERLAP_SEQNUM" NUMBER(10,0)
78#
79#
80# Each row provides the overlap for a stream pair (in a given run and
81# SFO). The pair is composed by a reference stream and an overlap stream.
82# We can distinguish between two cases:
83#
84# a) reference stream = overlap stream
85# The row provides the total number of events in the stream
86# (OVERLAP_EVENTS) and OVERLAP_RATIO is 1
87#
88# b) reference stream != overlap stream
89# The row provides the overlap between the two streams (OVERLAP_EVENTS).
90# OVERLAP_RATIO is defined as OVERLAP_EVENTS/#(events for reference stream).
91#
92# The attached screenshot is an example of the expected result (with two
93# streams only).
94#
95#
96# The TYPE column is needed to distinguish the source level of the counters:
97#
98# RAW: raw streams at the input of the SFO
99# FILE: resulting streams after the SFO stream processing
100# (inclusive/exclusive, late, ...)
101#
102
103from CoolRunQuery.utils.AtlRunQueryCache import Cache
104from CoolRunQuery.AtlRunQueryRun import Run
105
106from time import time
107from collections import defaultdict
108
109@Cache("cache/sfo","%s_run_%i")
110def executeCachedQuery(run,cu,q,cachekey):
111 cu.execute(q, run=run)
112 return cu.fetchall()
113
114def executeQuery(run,cu,q,cachekey):
115 cu.execute(q, run=run)
116 return cu.fetchall()
117
118
119
120
121
122# The following 5 methods are used in the stream selector
123
124
125
126def GetSFO_streamsAll( cursor, runlist ):
127 res = defaultdict(list)
128 q = "SELECT DISTINCT STREAMTYPE, STREAM FROM SFO_TZ_RUN WHERE RUNNR=:run"
129 cursor.prepare(q)
130
131 for run in runlist:
132 cursor.execute(q,run=run)
133 qr = cursor.fetchall()
134 for e in qr:
135 res[run].append("%s_%s" % (e[0],e[1]))
136
137 return res
138
139
140def GetSFO_filesAll( cursor, runlist ):
141 """returns nfiles, fsize, nevents"""
142 res = defaultdict(dict)
143 q = "SELECT STREAMTYPE, STREAM, COUNT(FILESIZE), SUM(FILESIZE), SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR = :run GROUP BY STREAM, STREAMTYPE"
144 cursor.prepare(q)
145
146 for run in runlist:
147 if run==Run.runnropen:
148 qr = executeQuery(run, cursor, q, cachekey=("filesAll",run))
149 else:
150 qr = executeCachedQuery(run, cursor, q, cachekey=("filesAll",run))
151 for e in qr:
152 key = "%s_%s" % (e[0],e[1])
153 res[run][key] = e[2:5] # #files, filesize, nevents
154 return res
155
156
157
158def GetSFO_LBsAll( cursor, runlist ):
159 res = defaultdict(dict)
160 q = "SELECT STREAMTYPE, STREAM, MIN(LUMIBLOCKNR), MAX(LUMIBLOCKNR), COUNT(DISTINCT LUMIBLOCKNR) FROM SFO_TZ_Lumiblock WHERE RUNNR=:run GROUP BY STREAM, STREAMTYPE"
161 cursor.prepare(q)
162
163 for run in runlist:
164 if run==Run.runnropen:
165 qr = executeQuery(run, cursor, q, cachekey=("LBsAll",run))
166 else:
167 qr = executeCachedQuery(run, cursor, q, cachekey=("LBsAll",run))
168 for e in qr:
169 key2 = "%s_%s" % (e[0],e[1])
170 res[run][key2] = e[2:5] # minlb,maxlb,nlb
171 return res
172
173
174
175def GetSFO_NeventsAll( cursor, runlist ):
176 # due to the amount of information oracle access is faster than file access
177 res = {}
178 q = "SELECT STREAMTYPE, STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run GROUP BY STREAMTYPE, STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
179 cursor.prepare(q)
180
181 for run in runlist:
182 if run==Run.runnropen:
183 qr = executeQuery(run, cursor, q, cachekey=("NeventsAll",run))
184 else:
185 qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAll",run))
186 res[run] = defaultdict(list)
187 for e in qr:
188 key2 = "%s_%s" % (e[0],e[1])
189 res[run][key2].append(e[2:4]) # lbNr, NrEv
190
191 return res
192
193
194def GetSFO_overlapAll( cursor, runlist ):
195 """returns number of overlap events"""
196 res = {}
197 q = "SELECT REFERENCE_STREAM, OVERLAP_STREAM, SUM(OVERLAP_EVENTS) FROM SFO_TZ_Overlap WHERE TYPE='FILE' and RUNNR=:run GROUP BY REFERENCE_STREAM, OVERLAP_STREAM"
198 cursor.prepare(q)
199
200 for run in runlist:
201 if run==Run.runnropen:
202 qr = executeQuery(run, cursor, q, cachekey=("overlapAll",run))
203 else:
204 qr = executeCachedQuery(run, cursor, q, cachekey=("overlapAll",run))
205
206 res[run] = defaultdict(dict)
207 for e in qr:
208 res[run][e[0]][e[1]] = e[2] # overlap
209
210 return res
211
212
213
214
215
216
217# The following function is used by the XML file maker
218
219# we store only information about physics streams
220
221
222
223
224def GetSFO_NeventsAllPhysics( cursor, runlist ):
225 res = {}
226 q = "SELECT STREAM, LUMIBLOCKNR, SUM(NREVENTS) FROM SFO_TZ_File WHERE RUNNR=:run AND STREAMTYPE='physics' GROUP BY STREAM, LUMIBLOCKNR ORDER BY LUMIBLOCKNR"
227 cursor.prepare(q)
228
229 for run in runlist:
230 if run==Run.runnropen:
231 qr = executeQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
232 else:
233 qr = executeCachedQuery(run, cursor, q, cachekey=("NeventsAllPhysics",run))
234 res[run] = defaultdict(list)
235 for e in qr:
236 key2 = "physics_%s" % e[0]
237 res[run][key2].append(e[1:3]) # lbNr, NrEv
238
239 return res
240
241
242
243
244
245# The following function is used by the EventSelector
246
247
248
249def GetSFO_NphysicseventsAll( cursor, runlist ):
250 res = {}
251 q = "SELECT SUM(OVERLAP_EVENTS) from SFO_TZ_OVERLAP WHERE RUNNR=:run AND TYPE='EVENTCOUNT' AND REFERENCE_STREAM='physics_EventCount'"
252 cursor.prepare(q)
253 for run in runlist:
254 if run==Run.runnropen or 1: # FIXME problem in cache
255 qr = executeQuery(run, cursor, q, cachekey=("SumOverlap",run))
256 else:
257 qr = executeCachedQuery(run, cursor, q, cachekey=("SumOverlap",run))
258 for e in qr:
259 if e[0] is not None:
260 res[run] = e[0]
261 return res
262
263
264
265
266def GetSFO_lastNruns( cursor, nruns ):
267 """returns list of last 'nruns' run numbers, where at least one stream is not calibration"""
268 cursor.execute( "SELECT RUNNR,STATE FROM (SELECT UNIQUE RUNNR,STATE FROM SFO_TZ_RUN WHERE STREAMTYPE!='calibration' AND RUNNR < 999999 ORDER BY RUNNR DESC) SFO_TZ_RUN2 WHERE rownum<=:arg_1", arg_1=nruns )
269 return cursor.fetchall()
270
271
272
273
278
279def SetOKSLinks( runlist ):
280 from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
281 conn = coolDbConn.GetAtlasRunDBConnection()
282 cursor = conn.cursor()
283 query = [
284 "select ConfigSchema,ConfigData from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run", # Run 1+2
285 "select ConfigVersion,Release from ATLAS_RUN_NUMBER.RunNumber where RunNumber=:run" # Run 3
286 ]
287 for run in runlist:
288 cursor.execute(query[0 if run.lhcRun<3 else 1], run=run.runNr)
289 re = cursor.fetchall()
290 run.addResult('oks',re[0])
291
292def main():
293 test_oks = True
294 test_sfo = False
295
296 if test_oks:
297 #from os import environ as env
298 #from CoolRunQuery.AtlRunQueryUtils import coolDbConn
299 #coolDbConn.get_auth('oracle://atlr/rn_r') # only in /afs/cern.ch/atlas/project/tdaq/databases/.coral/authentication.xml
300 #print (coolDbConn.get_auth('oracle://ATLAS_COOLPROD/ATLAS_COOLOFL_TRIGGER'))
301 from CoolRunQuery.AtlRunQuerySFO import SetOKSLinks
302 SetOKSLinks([Run(178211)]) # Run 2
303 SetOKSLinks([Run(405396)]) # Run 3
304
305 if test_sfo:
306
307 runno = 122050
308 import sys
309 if len(sys.argv)>1:
310 runno = int(sys.argv[1])
311
312 runno = [140541, 140571, 140579, 140592, 140616, 140620,
313 140622, 140638, 140670, 140682, 140704, 140737, 140747,
314 140748, 140754, 140762, 140765, 140769, 140772, 140776,
315 140790, 140794, 140822, 140836, 140842, 140929, 140953,
316 140955, 140974, 140975, 141046, 141059, 141066, 141079,
317 141109, 141150, 141189, 141192, 141194, 141203, 141209,
318 141226, 141234, 141236, 141237, 141238, 141266, 141270,
319 141359, 141374, 141387, 141398, 141401, 141403, 141461,
320 141473, 141474, 141525, 141527, 141529, 141533, 141534,
321 141561, 141562, 141563, 141565, 141599, 141624, 141655,
322 141667, 141670, 141688, 141689, 141691, 141695, 141700,
323 141702, 141704, 141705, 141706, 141707, 141718, 141721,
324 141730, 141746, 141748, 141749, 141755, 141769, 141807,
325 141811, 141818, 141841, 141915, 141928, 141976, 141979,
326 141994, 141998, 141999, 142042, 142065, 142081, 142091,
327 142094, 142111, 142123, 142125, 142128, 142133, 142144,
328 142149, 142154, 142155, 142157, 142159, 142161, 142165,
329 142166, 142171, 142174, 142183, 142185, 142187, 142189,
330 142190, 142191, 142192, 142193, 142194, 142195, 142199,
331 142203, 142205, 142210, 142214, 142216, 142240, 142258,
332 142259, 142265, 142291, 142301, 142308, 142309, 142310,
333 142319, 142356, 142368, 142383, 142390, 142391, 142392,
334 142394, 142395, 142397, 142400, 142401, 142402, 142403,
335 142404, 142405, 142406, 143019, 143023, 143027, 143033,
336 143034, 143131, 143136, 143143, 143163, 143169, 143171,
337 143178, 143182, 143185, 143190, 143192, 143198, 143203,
338 143204, 143205, 143207, 143210, 143218, 143222, 143225,
339 143236, 143242]
340
341 #runno = range(141000,143000)
342
343
344 from CoolRunQuery.utils.AtlRunQueryUtils import coolDbConn
345 connection = coolDbConn.GetSFODBConnection()
346 cursor = connection.cursor()
347
348 # find last N runs
349 if False:
350 n = 10
351 runs = GetSFO_lastNruns( cursor, n )
352 print ('Last %i runs:' % n)
353 for r in runs:
354 print ('... %i' % r)
355 sys.exit()
356
357 # retrieve streams
358 if True:
359 start = time()
360 GetSFO_streamsAll( cursor, runno )
361 print ("streams",time()-start)
362 start = time()
363 GetSFO_LBsAll ( cursor, runno )
364 print ("lbs",time()-start)
365 start = time()
366 GetSFO_NeventsAll( cursor, runno )
367 print ("events",time()-start)
368 start = time()
369 GetSFO_filesAll ( cursor, runno )
370 print ("files",time()-start)
371 start = time()
372 GetSFO_overlapAll( cursor, runno )
373 print ("overlap",time()-start)
374
375
376 print ("Query execution time: %f sec" % (time()-start))
377
378 cursor.close()
379 connection.close()
380
381
382
383if __name__ == '__main__':
384 main()
GetSFO_overlapAll(cursor, runlist)
executeCachedQuery(run, cu, q, cachekey)
GetSFO_NphysicseventsAll(cursor, runlist)
GetSFO_filesAll(cursor, runlist)
GetSFO_LBsAll(cursor, runlist)
GetSFO_lastNruns(cursor, nruns)
GetSFO_NeventsAllPhysics(cursor, runlist)
SetOKSLinks(runlist)
The following function is used to get OKS info.
GetSFO_NeventsAll(cursor, runlist)
GetSFO_streamsAll(cursor, runlist)
executeQuery(run, cu, q, cachekey)