ATLAS Offline Software
Loading...
Searching...
No Matches
DQM_Tier0Wrapper_tf.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4
5
62
63import sys, os.path, os, json, time, pprint, traceback
64from xmlrpc import client as xmlrpclib
65from subprocess import getstatusoutput
66#sami
67import hashlib
68
69
70
71# Utility function
72
73def getSubFileMap(fname, nevts=0) :
74 if os.path.isfile(fname) :
75 sz = os.path.getsize(fname)
76 map = { 'name': fname,
77 'file_size' : sz,
78 'nentries' : nevts,
79 }
80 else :
81 map = {}
82 return map
83
84def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap):
85 import stomp, json
86 from DataQualityUtils import stompconfig
87 dest='/topic/atlas.dqm.progress'
88 conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
89 conn.connect(wait=True, **stompconfig.config())
90
91 eos_only = False
92
93 servers = parmap.get('servers', 'False')
94 if servers == [] or servers == '':
95 eos_only = True
96
97
98 body = {
99 'run': run,
100 'project_tag': ptag,
101 'stream': stream,
102 'ami': ami,
103 'pass': procpass,
104 'hcfg': hcfg,
105 'eos_only': eos_only,
106 }
107 headers = {
108 'MsgClass':'DQ',
109 'MsgType': (('' if isprod else 'Development') +
110 ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
111 'type':'textMessage',
112 'persistent': 'true',
113 'destination': dest,
114 }
115 conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
116 conn.disconnect()
117
118
119
120def genmd5sum(filename):
121 md5summer=hashlib.md5()
122 if os.path.isfile(filename):
123 try:
124 with open(filename,'rb') as infil:
125 while True:
126 fs=infil.read(8192)
127 if not fs:
128 break
129 md5summer.update(fs)
130 except:
131 pass
132 print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
133 return
134
135def dq_combined_trf(jsonfile, outmap):
136
137 print("\n##################################################################")
138 print("## STEP 1: creating file with list of root files ...")
139 print("##################################################################\n")
140
141 nfiles=0
142
143 try:
144 # extract parameters from json file
145 print("Using json file ", jsonfile, " for input parameters")
146 f = open(jsonfile, 'r')
147 parmap = json.load(f)
148 f.close()
149
150 print("\nFull Tier-0 run options:\n")
151 pprint.pprint(parmap)
152
153 inputfilelist = parmap.get('inputHistFiles', [])
154 nfiles = len(inputfilelist)
155 histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
156 histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
157 except:
158 outmap['exitCode'] = 101
159 outmap['exitAcronym'] = 'TRF_NOINPUT'
160 outmap['exitMsg'] = 'Trouble reading json input dict.'
161 traceback.print_exc()
162 return
163
164 if not nfiles : # problem with job definition or reading json file
165 outmap['exitCode'] = 102
166 outmap['exitAcronym'] = 'TRF_NOINPUT'
167 outmap['exitMsg'] = 'Empty input file list.'
168 return
169
170 histtmpflist = []
171 histtmpdsname = ''
172 nevts = 0
173
174 try:
175 if isinstance(inputfilelist[0], str) :
176 histtmpdsname = (inputfilelist[0]).split('#')[0]
177 for val in inputfilelist :
178 histtmpflist.append(val.split('#')[1])
179
180 elif isinstance(inputfilelist[0], dict) :
181 histtmpdsname = inputfilelist[0]['dsn']
182 for fdict in inputfilelist :
183 histtmpflist.append(fdict['lfn'])
184 nevt = fdict.get('events', 0)
185 if nevt is None:
186 nevt=0
187 print("WARNING Can't get number of events from input json file")
188 nevts+=nevt
189
190 f = open('hist_merge.list', 'w')
191 txtstr = ""
192 for hf in histtmpflist :
193 txtstr += "%s\n" % hf
194 f.write(txtstr)
195 f.close()
196
197 cmd = "cat hist_merge.list"
198 (s,o) = getstatusoutput(cmd)
199 print("\nContents of file hist_merge.list:\n")
200 print(o)
201 except:
202 outmap['exitCode'] = 103
203 outmap['exitAcronym'] = 'TRF_INPUTINFO'
204 outmap['exitMsg'] = 'ERROR: crash in assembling input file list (STEP 1)'
205 traceback.print_exc()
206 return
207
208 try:
209 print("\n##################################################################")
210 print("## STEP 2: determining job parameters...")
211 print("##################################################################\n")
212
213 skipMerge = parmap.get('skipMerge', 'False')
214 if skipMerge == 'True':
215 if nfiles != 1:
216 print("ERROR: skipMerge specified but something other than one input file specified")
217 outmap['exitCode'] = 108
218 outmap['exitAcronym'] = 'TRF_INPUTINFO'
219 outmap['exitMsg'] = 'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
220 return
221 histdsname = histtmpdsname
222 histfile = histtmpflist[0]
223 else:
224 # normal output file determination
225 histdsname = (parmap['outputHistFile']).split('#')[0]
226 histfile = (parmap['outputHistFile']).split('#')[1]
227 amitag = histdsname.split('.')[5]
228
229
230 # incremental mode on/off
231 incr = parmap.get('incrementalMode', 'False')
232
233 # post-processing on/off
234 postproc = parmap.get('postProcessing', 'True')
235
236 # database uploading on/off
237 allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
238
239 # do web display
240 doWebDisplay = parmap.get('doWebDisplay', 'True')
241
242 # extra parameters for merge
243 mergeParams = parmap.get('mergeParams', '')
244
245 # production mode
246 productionMode = parmap.get('productionMode', 'True')
247 if productionMode != 'True' and incr == 'True':
248 print("Production mode is not True, turning off incremental mode")
249 incr = 'False'
250
251 # server list
252 servers = parmap.get('servers', 'False')
253 os.environ['DQC_SERVERS'] = servers
254
255 # get file paths, put into environment vars
256 filepaths = parmap.get('filepaths', None)
257 if filepaths and isinstance(filepaths, dict):
258 if 'basename' not in filepaths:
259 print("Improperly formed 'filepaths' (no 'basename')")
260 else:
261 for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
262 if evtclass not in filepaths:
263 print("Improperly formed 'filepaths' (no '%s')" % evtclass)
264 else:
265 clinfo = filepaths[evtclass]
266 for timeclass in ('run', 'minutes10', 'minutes30'):
267 if timeclass not in clinfo:
268 print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
269 else:
270 dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
271 fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
272 print("Setting %s = %s" % (dqcenvvar, fpath))
273 os.environ[dqcenvvar] = fpath
274
275 # extract info from dataset name
276 # AMI project name
277 # override if tag has been specified in parmap
278 try :
279 dqproject = histdsname.split('.')[0]
280 except :
281 dqproject = 'data_test'
282 dqproject = parmap.get('projectTag', dqproject)
283
284 # run number
285 if 'runNumber' in parmap :
286 runnr = parmap['runNumber']
287 else :
288 try :
289 runnr = int(histdsname.split('.')[1])
290 except :
291 runnr = 1234567890
292
293 # stream name
294 if 'streamName' in parmap :
295 stream = parmap['streamName']
296 else :
297 try :
298 stream = histdsname.split('.')[2]
299 except :
300 stream = 'test_dummy'
301
302 # processing pass number
303 procnumber = 99
304 MAX_XMLRPC_TRIES = 5
305 if 'procNumber' in parmap :
306 procnumber = parmap['procNumber']
307 else :
308 n_xmlrpc_tries = 1
309 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
310 procnumber = 99
311 try :
312 xmlrpcserver = xmlrpclib.ServerProxy('http://proc-pass-atlasdqm.app.cern.ch')
313 #xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888') ##old server
314 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
315 break
316 except :
317 print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
318 n_xmlrpc_tries += 1
319 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
320 time.sleep(20*2**n_xmlrpc_tries)
321
322
323 print("Job parameters:\n")
324 print(" Run number: ", runnr)
325 print(" Stream name: ", stream)
326 print(" Processing pass: ", procnumber)
327 print(" Incremental mode:", incr)
328 print(" Post-processing: ", postproc)
329 print(" COOL uploads: ", allowCOOLUpload)
330 print(" Production mode: ", productionMode)
331 print(" Skip merge: ", skipMerge)
332 print(" Merge parameters:", mergeParams)
333 if servers == []:
334 print(" EOS only: True")
335
336 except:
337 outmap['exitCode'] = 104
338 outmap['exitAcronym'] = 'TRF_JOBPARS'
339 outmap['exitMsg'] = 'Error in determining job parameters (STEP 2).'
340 traceback.print_exc()
341 return
342
343 # environment setting
344 os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
345 print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
346 os.environ['DQ_STREAM'] = stream
347 print("Setting env variable DQ_STREAM to %s\n" % os.environ['DQ_STREAM'])
348 os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
349 print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
350
351 if skipMerge != 'True':
352 try:
353 print("\n##################################################################")
354 print("## STEP 3: running histogram merging procedure ...")
355 print("##################################################################\n")
356
357
358 if postproc == 'True' :
359 if incr == 'True':
360 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
361 else:
362 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
363 else :
364 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
365
366 cmd += (f' {mergeParams}' if mergeParams else '')
367
368 print("Histogram merging command:\n")
369 print(cmd)
370 print("\n##################################################################\n")
371
372 print("## ... logfile from DQHistogramMerge.py: ")
373 print("--------------------------------------------------------------------------------")
374 tstart = time.time()
375 # execute command
376 retcode1 = os.system(cmd)
377 print("--------------------------------------------------------------------------------")
378 t1 = time.time()
379 dt1 = int(t1 - tstart)
380
381 print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
382 print("## ... elapsed time: ", dt1, " sec")
383
384 if retcode1 != 0 :
385 outmap['exitCode'] = retcode1
386 outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
387 outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
388 print("ERROR: DQHistogramMerge.py execution problem!")
389 retcode = retcode1
390 txt = 'DQHistogramMerge.py execution problem'
391 try:
392 try:
393 with open('hist_merge.list','r') as infilelist:
394 for infname in infilelist:
395 genmd5sum(infname.rstrip(os.linesep))
396 except:
397 pass
398 genmd5sum(histfile)
399 DQResFile="DQResourceUtilization.txt"
400 if os.path.exists(DQResFile):
401 print("dumping resource utilization log")
402 with open(DQResFile) as resfile:
403 for resline in resfile:
404 print(resline, end=' ')
405 except:
406 outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
407 traceback.print_exc()
408 print("ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
409 return
410
411 if postproc == 'True' and incr == 'False':
412 print("\n##################################################################")
413 print("## STEP 3b: copying postprocessing output to AFS ...")
414 print("##################################################################\n")
415
416 cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
417
418 print("File move command:\n")
419 print(cmd)
420 print("\n##################################################################\n")
421
422 print("## ... logfile from DQFileMove.py: ")
423 print("--------------------------------------------------------------------------------")
424 # execute command
425 retcode1b = os.system(cmd)
426 print("--------------------------------------------------------------------------------")
427 t1b = time.time()
428 dt1b = int(t1b - t1)
429 t1 = t1b
430
431 print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
432 print("## ... elapsed time: ", dt1b, " sec")
433 except:
434 outmap['exitCode'] = 105
435 outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
436 outmap['exitMsg'] = 'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
437 traceback.print_exc()
438 return
439 else:
440 print("\n##################################################################")
441 print("## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
442 print("##################################################################\n")
443 t1 = time.time()
444
445 try:
446 retcode2 = 0
447 dt2 = 0
448 if doWebDisplay == 'True':
449 print("\n##################################################################")
450 print("## STEP 4: running web-display creation procedure ...")
451 print("##################################################################\n")
452
453 cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
454
455 print("Web display creation command:\n")
456 print(cmd)
457 print("\n##################################################################\n")
458
459 print("## ... logfile from DQWebDisplay.py: ")
460 print("--------------------------------------------------------------------------------")
461 # execute command
462 retcode2 = os.system(cmd)
463 print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
464 print("--------------------------------------------------------------------------------")
465 t2 = time.time()
466 dt2 = int(t2 - t1)
467
468 print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
469 print("## ... elapsed time: ", dt2, " sec")
470 if not (retcode2 >> 8) in (0, 5) :
471 print("ERROR: DQWebDisplay.py execution problem!")
472 outmap['exitCode'] = retcode2
473 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
474 outmap['exitMsg'] = 'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
475 try:
476 with open('hist_merge.list','r') as infilelist:
477 for infname in infilelist:
478 genmd5sum(infname.rstrip(os.linesep))
479 except:
480 pass
481 genmd5sum(histfile)
482 return
483 if productionMode == 'True':
484 try:
485 print('Publishing to message service')
486 publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'), parmap=parmap)
487 except:
488 outmap['exitCode'] = 106
489 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
490 outmap['exitMsg'] = 'ERROR: Failure in publishing info to messaging service (STEP 4).'
491 traceback.print_exc()
492 return
493 else:
494 print("\n##################################################################")
495 print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
496 print("##################################################################\n")
497 print('Web display off, not publishing to message service')
498 except:
499 outmap['exitCode'] = 106
500 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
501 outmap['exitMsg'] = 'ERROR: Failure in web-display creation procedure (STEP 4).'
502 print('ERROR: Failure in web-display creation procedure (STEP 4).')
503 traceback.print_exc()
504 return
505
506 print("\n##################################################################")
507 print("## STEP 5: finishing the job ...")
508 print("##################################################################\n")
509
510 # get info for report json file
511 try:
512 outfiles = [getSubFileMap(histfile, nevts=nevts)]
513 # assemble job report map
514 outmap['files']['output'][0]['dataset'] = histdsname
515 outmap['files']['output'][0]['subFiles'] = outfiles
516 outmap['resource']['transform']['processedEvents'] = int(nevts)
517 return
518 except:
519 outmap['exitCode'] = 107
520 outmap['exitAcronym'] = 'TRF_JOBREPORT'
521 outmap['exitMsg'] = 'ERROR: in job report creation (STEP 5)'
522 print("ERROR: in job report creation (STEP 5) !")
523 traceback.print_exc()
524 return
525
526def dq_trf_wrapper(jsonfile):
527 print("\n##################################################################")
528 print("## ATLAS Tier-0 Offline DQM Processing ##")
529 print("##################################################################\n")
530
531 outmap = { 'exitAcronym' : 'OK',
532 'exitCode' : 0,
533 'exitMsg' : 'trf finished OK',
534 'files' : { 'output' : [{ 'dataset' : '',
535 'subFiles' : [ {},
536 ]}
537 ] },
538 'resource' : { 'transform' : { 'processedEvents' : 0 } }
539 }
540
541 # dq_combined_trf will update outmap
542 tstart = time.time()
543 dq_combined_trf(jsonfile, outmap)
544 outmap['resource']['transform']['wallTime'] = int(time.time() - tstart)
545
546 # dump json report map
547 f = open('jobReport.json', 'w')
548 json.dump(outmap, f)
549 f.close()
550
551 # summarize status
552 print("\n## ... job finished with retcode : %s" % outmap['exitCode'])
553 print("## ... error acronym: ", outmap['exitAcronym'])
554 print("## ... job status message: ", outmap['exitMsg'])
555 print("## ... elapsed time: ", outmap['resource']['transform']['wallTime'], "sec")
556 print("##")
557 print("##################################################################")
558 print("## End of job.")
559 print("##################################################################\n")
560
561
562
565
566if __name__ == "__main__":
567
568 if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argJSON=')) :
569 print("Input format wrong --- use ")
570 print(" --argJSON=<json-dictionary containing input info> ")
571 print(" with key/value pairs: ")
572 print(" 1) 'inputHistFiles': python list ")
573 print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
574 print(" or list of file dictionaries ")
575 print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
576 print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
577 print(" 2) 'outputHistFile': string 'datasetname#filename' ")
578 print(" (HIST output dataset name + file) ")
579 print(" optional parameters: ")
580 print(" 3) 'incrementalMode': string ('True'/'False') ")
581 print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
582 print(" 'False': create final DQM webpages, replace temporary ones) ")
583 print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
584 print(" ('False': run histogram merging and DQ assessment only; ")
585 print(" 'True': run additional post-processing step (fitting, etc.)) ")
586 print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
587 print(" 6) 'runNumber': int ")
588 print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
589 print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
590 print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
591 print(" ('True': allow upload of defects to database; ")
592 print(" 'False': do not upload defects to database)")
593 sys.exit(-1)
594
595 else :
596 jsonfile = sys.argv[1][len('--argJSON='):]
597 dq_trf_wrapper(jsonfile)
void print(char *figname, TCanvas *c1)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
dq_combined_trf(jsonfile, outmap)
getSubFileMap(fname, nevts=0)
publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap)