ATLAS Offline Software
Loading...
Searching...
No Matches
DQM_Tier0Wrapper_trf.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4
5
54
55import sys, string, os.path, os, pickle, time, pprint
56from xmlrpc import client as xmlrpclib
57#sami
58import hashlib
59from subprocess import getstatusoutput
60
61
62
63# Utility function
64
65def getFileMap(fname, dsname, nevts=0) :
66 if os.path.isfile(fname) :
67 sz = os.path.getsize(fname)
68 map = { 'lfn': fname,
69 'dataset' : dsname,
70 'size' : sz,
71 'events' : nevts
72 }
73 else :
74 map = {}
75 return map
76
77def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod):
78 import stomp, json, ssl
79 from DataQualityUtils import stompconfig
80 dest='/topic/atlas.dqm.progress'
81 conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
82 conn.connect(wait=True, **stompconfig.config())
83
84 body = {
85 'run': run,
86 'project_tag': ptag,
87 'stream': stream,
88 'ami': ami,
89 'pass': procpass,
90 'hcfg': hcfg,
91 }
92 headers = {
93 'MsgClass':'DQ',
94 'MsgType': (('' if isprod else 'Development') +
95 ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
96 'type':'textMessage',
97 'persistent': 'true',
98 'destination': dest,
99 }
100 conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
101 conn.disconnect()
102
103
104
105def genmd5sum(filename):
106 md5summer=hashlib.md5()
107 if os.path.isfile(filename):
108 try:
109 infil=open(filename,'rb')
110 while True:
111 fs=infil.read(8192)
112 if not fs:
113 break
114 md5summer.update(fs)
115 finally:
116 infil.close()
117 print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
118 return
119
120def dq_combined_trf(picklefile):
121
122 tstart = time.time()
123
124 print("\n##################################################################")
125 print("## ATLAS Tier-0 Offline DQM Processing ##")
126 print("##################################################################\n")
127
128 print("\n##################################################################")
129 print("## STEP 1: creating file with list of root files ...")
130 print("##################################################################\n")
131
132 # extract parameters from pickle file
133 print("Using pickled file ", picklefile, " for input parameters")
134 f = open(picklefile, 'r')
135 parmap = pickle.load(f)
136 f.close()
137
138 print("\nFull Tier-0 run options:\n")
139 pprint.pprint(parmap)
140
141 inputfilelist = parmap.get('inputHistFiles', [])
142 nfiles = len(inputfilelist)
143 histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
144 histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
145
146 if not nfiles : # problem with job definition or reading pickle file
147 dt = int(time.time() - tstart)
148 retcode = 1
149 acronym = 'TRF_NOINPUT'
150 txt = 'empty input file list'
151 reportmap = { 'prodsys': { 'trfCode': retcode,
152 'trfAcronym': acronym,
153 'jobOutputs': [],
154 'jobInputs': [],
155 'nevents': 0,
156 'more': { 'num1': 0, 'num2': dt, 'txt1': txt }
157 }
158 }
159
160 else :
161 histtmpflist = []
162 nevts = 0
163
164 if isinstance(inputfilelist[0], str) :
165 histtmpdsname = (inputfilelist[0]).split('#')[0]
166 for val in inputfilelist :
167 histtmpflist.append(val.split('#')[1])
168
169 elif isinstance(inputfilelist[0], dict) :
170 histtmpdsname = inputfilelist[0]['dsn']
171 for fdict in inputfilelist :
172 histtmpflist.append(fdict['lfn'])
173 nevt = fdict.get('events', 0)
174 if nevt is None:
175 nevt=0
176 print("WARNING Can't get number of events from input pickle file")
177 nevts+=nevt
178
179 f = open('hist_merge.list', 'w')
180 txtstr = ""
181 for hf in histtmpflist :
182 txtstr += "%s\n" % hf
183 f.write(txtstr)
184 f.close()
185
186 cmd = "cat hist_merge.list"
187 (s,o) = getstatusoutput(cmd)
188 print("\nContents of file hist_merge.list:\n")
189 print(o)
190
191
192 print("\n##################################################################")
193 print("## STEP 2: determining job parameters...")
194 print("##################################################################\n")
195
196 # output file
197 histdsname = (parmap['outputHistFile']).split('#')[0]
198 histfile = (parmap['outputHistFile']).split('#')[1]
199 amitag = histfile.split('.')[5]
200
201
202 # incremental mode on/off
203 incr = parmap.get('incrementalMode', 'False')
204
205 # post-processing on/off
206 postproc = parmap.get('postProcessing', 'True')
207
208 # database uploading on/off
209 allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
210
211 # do web display
212 doWebDisplay = parmap.get('doWebDisplay', 'True')
213
214 # production mode
215 productionMode = parmap.get('productionMode', 'True')
216 if productionMode != 'True' and incr == 'True':
217 print("Production mode is not True, turning off incremental mode")
218 incr = 'False'
219
220 # get file paths, put into environment vars
221 filepaths = parmap.get('filepaths', None)
222 if filepaths and isinstance(filepaths, dict):
223 if 'basename' not in filepaths:
224 print("Improperly formed 'filepaths' (no 'basename')")
225 else:
226 for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
227 if evtclass not in filepaths:
228 print("Improperly formed 'filepaths' (no '%s')" % evtclass)
229 else:
230 clinfo = filepaths[evtclass]
231 for timeclass in ('run', 'minutes10', 'minutes30'):
232 if timeclass not in clinfo:
233 print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
234 else:
235 dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
236 fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
237 print("Setting %s = %s" % (dqcenvvar, fpath))
238 os.environ[dqcenvvar] = fpath
239
240 # extract info from dataset name
241 # AMI project name
242 # override if tag has been specified in parmap
243 try :
244 dqproject = histdsname.split('.')[0]
245 except :
246 dqproject = 'data_test'
247 dqproject = parmap.get('projectTag', dqproject)
248
249 # run number
250 if 'runNumber' in parmap :
251 runnr = parmap['runNumber']
252 else :
253 try :
254 runnr = int(histdsname.split('.')[1])
255 except :
256 runnr = 1234567890
257
258 # stream name
259 if 'streamName' in parmap :
260 stream = parmap['streamName']
261 else :
262 try :
263 stream = histdsname.split('.')[2]
264 except :
265 stream = 'test_dummy'
266
267 # processing pass number
268 MAX_XMLRPC_TRIES = 5
269 if 'procNumber' in parmap :
270 procnumber = parmap['procNumber']
271 else :
272 n_xmlrpc_tries = 1
273 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
274 procnumber = 99
275 try :
276 xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888')
277 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
278 break
279 except :
280 print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
281 n_xmlrpc_tries += 1
282 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
283 time.sleep(20*2**n_xmlrpc_tries)
284
285 print("Job parameters:\n")
286 print(" Run number: ", runnr)
287 print(" Stream name: ", stream)
288 print(" Processing pass: ", procnumber)
289 print(" Incremental mode:", incr)
290 print(" Post-processing: ", postproc)
291 print(" COOL uploads: ", allowCOOLUpload)
292 print(" Production mode: ", productionMode)
293
294
295 print("\n##################################################################")
296 print("## STEP 3: running histogram merging procedure ...")
297 print("##################################################################\n")
298
299 # environment setting
300 os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
301 os.environ['DQ_STREAM'] = stream
302 print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
303 os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
304 print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
305
306 if postproc == 'True' :
307 if incr == 'True':
308 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
309 else:
310 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
311 else :
312 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
313
314 print("Histogram merging command:\n")
315 print(cmd)
316 print("\n##################################################################\n")
317
318 print("## ... logfile from DQHistogramMerge.py: ")
319 print("--------------------------------------------------------------------------------")
320 # execute command
321 retcode1 = os.system(cmd)
322 print("--------------------------------------------------------------------------------")
323 t1 = time.time()
324 dt1 = int(t1 - tstart)
325
326 print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
327 print("## ... elapsed time: ", dt1, " sec")
328
329 if retcode1 == 0 :
330 if postproc == 'True' and incr == 'False':
331 print("\n##################################################################")
332 print("## STEP 3b: copying postprocessing output to AFS ...")
333 print("##################################################################\n")
334
335 cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
336
337 print("File move command:\n")
338 print(cmd)
339 print("\n##################################################################\n")
340
341 print("## ... logfile from DQFileMove.py: ")
342 print("--------------------------------------------------------------------------------")
343 # execute command
344 retcode1b = os.system(cmd)
345 print("--------------------------------------------------------------------------------")
346 t1b = time.time()
347 dt1b = int(t1b - t1)
348 t1 = t1b
349
350 print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
351 print("## ... elapsed time: ", dt1b, " sec")
352
353 if doWebDisplay == 'True':
354 print("\n##################################################################")
355 print("## STEP 4: running web-display creation procedure ...")
356 print("##################################################################\n")
357
358 cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
359
360 print("Web display creation command:\n")
361 print(cmd)
362 print("\n##################################################################\n")
363
364 print("## ... logfile from DQWebDisplay.py: ")
365 print("--------------------------------------------------------------------------------")
366 # execute command
367 retcode2 = os.system(cmd)
368 print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
369 print("--------------------------------------------------------------------------------")
370 t2 = time.time()
371 dt2 = int(t2 - t1)
372
373 print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
374 print("## ... elapsed time: ", dt2, " sec")
375 else:
376 print("\n##################################################################")
377 print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
378 print("##################################################################\n")
379 retcode2 = 0
380 dt2 = 0
381
382 print("\n##################################################################")
383 print("## STEP 5: finishing the job ...")
384 print("##################################################################\n")
385
386 # assemble report gpickle file
387 outfiles = []
388 infiles = []
389
390 retcode = 0
391 acronym = 'OK'
392 txt = 'trf finished OK'
393
394 # get info for report gpickle file
395 if retcode1 == 0 :
396 dt = dt1
397 if (retcode2 >> 8) in (0, 5) :
398 # if success, or if unable to acquire cache lock
399 histmap = getFileMap(histfile, histdsname, nevts=nevts)
400 outfiles = [histmap]
401 dt += dt2
402 if doWebDisplay == 'True':
403 print('Publishing to message service')
404 publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'))
405 else:
406 print('Web display off, not publishing to message service')
407 else :
408 txt = 'DQWebDisplay.py execution problem'
409 print("ERROR: DQWebDisplay.py execution problem!")
410 retcode = retcode2
411 acronym = 'TRF_DQMDISPLAY_EXE'
412 try:
413 infilelist=open('hist_merge.list','r')
414 for infname in infilelist:
415 genmd5sum(infname.rstrip(os.linesep))
416 finally:
417 infilelist.close()
418 genmd5sum(histfile)
419 else :
420 print("ERROR: DQHistogramMerge.py execution problem!")
421 retcode = retcode1
422 acronym = 'TRF_DQMHISTMERGE_EXE'
423 dt = 0
424 txt = 'DQHistogramMerge.py execution problem'
425 try:
426 infilelist=open('hist_merge.list','r')
427 for infname in infilelist:
428 genmd5sum(infname.rstrip(os.linesep))
429 finally:
430 infilelist.close()
431 genmd5sum(histfile)
432 DQResFile="DQResourceUtilization.txt"
433 if os.path.exists(DQResFile):
434 print("dumping resource utilization log")
435 with open(DQResFile) as resfile:
436 for resline in resfile:
437 print(resline, end=' ')
438
439 # assemble job report map
440 reportmap = { 'prodsys': { 'trfCode': retcode,
441 'trfAcronym': acronym,
442 'jobOutputs': outfiles,
443 'jobInputs': infiles,
444 'nevents': int(nevts),
445 'more': { 'num1': int(nevts), 'num2': int(dt), 'txt1': txt }
446 }
447 }
448
449 # pickle report map
450 f = open('jobReport.gpickle', 'w')
451 pickle.dump(reportmap, f)
452 f.close()
453
454 print("\n## ... job finished with retcode : %s" % reportmap['prodsys']['trfCode'])
455 print("## ... error acronym: ", reportmap['prodsys']['trfAcronym'])
456 print("## ... elapsed time: ", reportmap['prodsys']['more']['num2'], "sec")
457 print("##")
458 print("##################################################################")
459 print("## End of job.")
460 print("##################################################################\n")
461
462
463
466
467if __name__ == "__main__":
468
469 if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argdict=')) :
470 print("Input format wrong --- use ")
471 print(" --argdict=<pickled-dictionary containing input info> ")
472 print(" with key/value pairs: ")
473 print(" 1) 'inputHistFiles': python list ")
474 print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
475 print(" or list of file dictionaries ")
476 print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
477 print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
478 print(" 2) 'outputHistFile': string 'datasetname#filename' ")
479 print(" (HIST output dataset name + file) ")
480 print(" optional parameters: ")
481 print(" 3) 'incrementalMode': string ('True'/'False') ")
482 print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
483 print(" 'False': create final DQM webpages, replace temporary ones) ")
484 print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
485 print(" ('False': run histogram merging and DQ assessment only; ")
486 print(" 'True': run additional post-processing step (fitting, etc.)) ")
487 print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
488 print(" 6) 'runNumber': int ")
489 print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
490 print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
491 print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
492 print(" ('True': allow upload of defects to database; ")
493 print(" 'False': do not upload defects to database)")
494 sys.exit(-1)
495
496 else :
497 picklefile = sys.argv[1][len('--argdict='):]
498 dq_combined_trf(picklefile)
void print(char *figname, TCanvas *c1)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod)
getFileMap(fname, dsname, nevts=0)