63 import sys, os.path, os, json, time, pprint, traceback
64 from xmlrpc
import client
as xmlrpclib
66 from subprocess
import getstatusoutput
75 if os.path.isfile(fname) :
76 sz = os.path.getsize(fname)
77 map = {
'name': fname,
87 from DataQualityUtils
import stompconfig
88 dest=
'/topic/atlas.dqm.progress'
89 conn=stomp.Connection([(
'atlas-mb.cern.ch', 61013)])
90 conn.connect(wait=
True, **stompconfig.config())
94 servers = parmap.get(
'servers',
'False')
95 if servers == []
or servers ==
'':
106 'eos_only': eos_only,
110 'MsgType': ((
'' if isprod
else 'Development') +
111 (
'WebDisplayRunComplete' if not incr
else 'WebDisplayIncremental')),
112 'type':
'textMessage',
113 'persistent':
'true',
116 conn.send(body=json.dumps(body), destination=dest,headers=headers,ack=
'auto')
122 md5summer=hashlib.md5()
123 if os.path.isfile(filename):
125 with open(filename,
'rb')
as infil:
133 print(
"md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
138 print(
"\n##################################################################")
139 print(
"## STEP 1: creating file with list of root files ...")
140 print(
"##################################################################\n")
146 print(
"Using json file ", jsonfile,
" for input parameters")
147 f =
open(jsonfile,
'r')
148 parmap = json.load(f)
151 print(
"\nFull Tier-0 run options:\n")
152 pprint.pprint(parmap)
154 inputfilelist = parmap.get(
'inputHistFiles', [])
155 nfiles = len(inputfilelist)
156 histMergeCompressionLevel=parmap.get(
'histMergeCompressionLevel', 1)
157 histMergeDebugLevel=parmap.get(
'histMergeDebugLevel', 0)
159 outmap[
'exitCode'] = 101
160 outmap[
'exitAcronym'] =
'TRF_NOINPUT'
161 outmap[
'exitMsg'] =
'Trouble reading json input dict.'
162 traceback.print_exc()
166 outmap[
'exitCode'] = 102
167 outmap[
'exitAcronym'] =
'TRF_NOINPUT'
168 outmap[
'exitMsg'] =
'Empty input file list.'
176 if isinstance(inputfilelist[0], str) :
177 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
178 for val
in inputfilelist :
179 histtmpflist.append(val.split(
'#')[1])
181 elif isinstance(inputfilelist[0], dict) :
182 histtmpdsname = inputfilelist[0][
'dsn']
183 for fdict
in inputfilelist :
184 histtmpflist.append(fdict[
'lfn'])
185 nevt = fdict.get(
'events', 0)
188 print(
"WARNING Can't get number of events from input json file")
191 f =
open(
'hist_merge.list',
'w')
193 for hf
in histtmpflist :
194 txtstr +=
"%s\n" % hf
198 cmd =
"cat hist_merge.list"
199 (s,o) = getstatusoutput(cmd)
200 print(
"\nContents of file hist_merge.list:\n")
203 outmap[
'exitCode'] = 103
204 outmap[
'exitAcronym'] =
'TRF_INPUTINFO'
205 outmap[
'exitMsg'] =
'ERROR: crash in assembling input file list (STEP 1)'
206 traceback.print_exc()
210 print(
"\n##################################################################")
211 print(
"## STEP 2: determining job parameters...")
212 print(
"##################################################################\n")
214 skipMerge = parmap.get(
'skipMerge',
'False')
215 if skipMerge ==
'True':
217 print(
"ERROR: skipMerge specified but something other than one input file specified")
218 outmap[
'exitCode'] = 108
219 outmap[
'exitAcronym'] =
'TRF_INPUTINFO'
220 outmap[
'exitMsg'] =
'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
222 histdsname = histtmpdsname
223 histfile = histtmpflist[0]
226 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
227 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
228 amitag = histdsname.split(
'.')[5]
232 incr = parmap.get(
'incrementalMode',
'False')
235 postproc = parmap.get(
'postProcessing',
'True')
238 allowCOOLUpload = parmap.get(
'allowCOOLUpload',
'True')
241 doWebDisplay = parmap.get(
'doWebDisplay',
'True')
244 mergeParams = parmap.get(
'mergeParams',
'')
247 productionMode = parmap.get(
'productionMode',
'True')
248 if productionMode !=
'True' and incr ==
'True':
249 print(
"Production mode is not True, turning off incremental mode")
253 servers = parmap.get(
'servers',
'False')
254 os.environ[
'DQC_SERVERS'] = servers
257 filepaths = parmap.get(
'filepaths',
None)
258 if filepaths
and isinstance(filepaths, dict):
259 if 'basename' not in filepaths:
260 print(
"Improperly formed 'filepaths' (no 'basename')")
262 for evtclass
in (
'Collisions',
'Cosmics',
'HeavyIons'):
263 if evtclass
not in filepaths:
264 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
266 clinfo = filepaths[evtclass]
267 for timeclass
in (
'run',
'minutes10',
'minutes30'):
268 if timeclass
not in clinfo:
269 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
271 dqcenvvar =
'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
272 fpath = os.path.join(filepaths[
'basename'], clinfo[timeclass])
273 print(
"Setting %s = %s" % (dqcenvvar, fpath))
274 os.environ[dqcenvvar] = fpath
280 dqproject = histdsname.split(
'.')[0]
282 dqproject =
'data_test'
283 dqproject = parmap.get(
'projectTag', dqproject)
286 if 'runNumber' in parmap :
287 runnr = parmap[
'runNumber']
290 runnr =
int(histdsname.split(
'.')[1])
295 if 'streamName' in parmap :
296 stream = parmap[
'streamName']
299 stream = histdsname.split(
'.')[2]
301 stream =
'test_dummy'
306 if 'procNumber' in parmap :
307 procnumber = parmap[
'procNumber']
310 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
313 xmlrpcserver = xmlrpclib.ServerProxy(
'http://proc-pass-atlasdqm.app.cern.ch')
315 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream,
'tier0')
318 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
320 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
321 time.sleep(20*2**n_xmlrpc_tries)
324 print(
"Job parameters:\n")
325 print(
" Run number: ", runnr)
326 print(
" Stream name: ", stream)
327 print(
" Processing pass: ", procnumber)
328 print(
" Incremental mode:", incr)
329 print(
" Post-processing: ", postproc)
330 print(
" COOL uploads: ", allowCOOLUpload)
331 print(
" Production mode: ", productionMode)
332 print(
" Skip merge: ", skipMerge)
333 print(
" Merge parameters:", mergeParams)
335 print(
" EOS only: True")
338 outmap[
'exitCode'] = 104
339 outmap[
'exitAcronym'] =
'TRF_JOBPARS'
340 outmap[
'exitMsg'] =
'Error in determining job parameters (STEP 2).'
341 traceback.print_exc()
345 os.environ[
'DQPRODUCTION'] =
'1' if productionMode ==
'True' else '0'
346 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
347 os.environ[
'DQ_STREAM'] = stream
348 print(
"Setting env variable DQ_STREAM to %s\n" % os.environ[
'DQ_STREAM'])
349 os.environ[
'COOLUPLOADS'] =
'1' if allowCOOLUpload ==
'True' and productionMode ==
'True' else '0'
350 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
352 if skipMerge !=
'True':
354 print(
"\n##################################################################")
355 print(
"## STEP 3: running histogram merging procedure ...")
356 print(
"##################################################################\n")
359 if postproc ==
'True' :
361 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
363 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
365 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
367 cmd += (f
' {mergeParams}' if mergeParams
else '')
369 print(
"Histogram merging command:\n")
371 print(
"\n##################################################################\n")
373 print(
"## ... logfile from DQHistogramMerge.py: ")
374 print(
"--------------------------------------------------------------------------------")
377 retcode1 = os.system(cmd)
378 print(
"--------------------------------------------------------------------------------")
380 dt1 =
int(t1 - tstart)
382 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
383 print(
"## ... elapsed time: ", dt1,
" sec")
386 outmap[
'exitCode'] = retcode1
387 outmap[
'exitAcronym'] =
'TRF_DQMHISTMERGE_EXE'
388 outmap[
'exitMsg'] =
'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
389 print(
"ERROR: DQHistogramMerge.py execution problem!")
391 txt =
'DQHistogramMerge.py execution problem'
394 with open(
'hist_merge.list',
'r')
as infilelist:
395 for infname
in infilelist:
400 DQResFile=
"DQResourceUtilization.txt"
401 if os.path.exists(DQResFile):
402 print(
"dumping resource utilization log")
403 with open(DQResFile)
as resfile:
404 for resline
in resfile:
405 print(resline, end=
' ')
407 outmap[
'exitMsg'] =
'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
408 traceback.print_exc()
409 print(
"ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
412 if postproc ==
'True' and incr ==
'False':
413 print(
"\n##################################################################")
414 print(
"## STEP 3b: copying postprocessing output to AFS ...")
415 print(
"##################################################################\n")
417 cmd =
"python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
419 print(
"File move command:\n")
421 print(
"\n##################################################################\n")
423 print(
"## ... logfile from DQFileMove.py: ")
424 print(
"--------------------------------------------------------------------------------")
426 retcode1b = os.system(cmd)
427 print(
"--------------------------------------------------------------------------------")
432 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
433 print(
"## ... elapsed time: ", dt1b,
" sec")
435 outmap[
'exitCode'] = 105
436 outmap[
'exitAcronym'] =
'TRF_DQMHISTMERGE_EXE'
437 outmap[
'exitMsg'] =
'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
438 traceback.print_exc()
441 print(
"\n##################################################################")
442 print(
"## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
443 print(
"##################################################################\n")
449 if doWebDisplay ==
'True':
450 print(
"\n##################################################################")
451 print(
"## STEP 4: running web-display creation procedure ...")
452 print(
"##################################################################\n")
454 cmd =
"python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
456 print(
"Web display creation command:\n")
458 print(
"\n##################################################################\n")
460 print(
"## ... logfile from DQWebDisplay.py: ")
461 print(
"--------------------------------------------------------------------------------")
463 retcode2 = os.system(cmd)
464 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
465 print(
"--------------------------------------------------------------------------------")
469 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
470 print(
"## ... elapsed time: ", dt2,
" sec")
471 if not (retcode2 >> 8)
in (0, 5) :
472 print(
"ERROR: DQWebDisplay.py execution problem!")
473 outmap[
'exitCode'] = retcode2
474 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
475 outmap[
'exitMsg'] =
'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
477 with open(
'hist_merge.list',
'r')
as infilelist:
478 for infname
in infilelist:
484 if productionMode ==
'True':
486 print(
'Publishing to message service')
487 publish_success_to_mq(runnr, dqproject, stream, incr=(incr==
'True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode==
'True'), parmap=parmap)
489 outmap[
'exitCode'] = 106
490 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
491 outmap[
'exitMsg'] =
'ERROR: Failure in publishing info to messaging service (STEP 4).'
492 traceback.print_exc()
495 print(
"\n##################################################################")
496 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
497 print(
"##################################################################\n")
498 print(
'Web display off, not publishing to message service')
500 outmap[
'exitCode'] = 106
501 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
502 outmap[
'exitMsg'] =
'ERROR: Failure in web-display creation procedure (STEP 4).'
503 print(
'ERROR: Failure in web-display creation procedure (STEP 4).')
504 traceback.print_exc()
507 print(
"\n##################################################################")
508 print(
"## STEP 5: finishing the job ...")
509 print(
"##################################################################\n")
515 outmap[
'files'][
'output'][0][
'dataset'] = histdsname
516 outmap[
'files'][
'output'][0][
'subFiles'] = outfiles
517 outmap[
'resource'][
'transform'][
'processedEvents'] =
int(nevts)
520 outmap[
'exitCode'] = 107
521 outmap[
'exitAcronym'] =
'TRF_JOBREPORT'
522 outmap[
'exitMsg'] =
'ERROR: in job report creation (STEP 5)'
523 print(
"ERROR: in job report creation (STEP 5) !")
524 traceback.print_exc()
528 print(
"\n##################################################################")
529 print(
"## ATLAS Tier-0 Offline DQM Processing ##")
530 print(
"##################################################################\n")
532 outmap = {
'exitAcronym' :
'OK',
534 'exitMsg' :
'trf finished OK',
535 'files' : {
'output' : [{
'dataset' :
'',
539 'resource' : {
'transform' : {
'processedEvents' : 0 } }
545 outmap[
'resource'][
'transform'][
'wallTime'] =
int(time.time() - tstart)
548 f =
open(
'jobReport.json',
'w')
553 print(
"\n## ... job finished with retcode : %s" % outmap[
'exitCode'])
554 print(
"## ... error acronym: ", outmap[
'exitAcronym'])
555 print(
"## ... job status message: ", outmap[
'exitMsg'])
556 print(
"## ... elapsed time: ", outmap[
'resource'][
'transform'][
'wallTime'],
"sec")
558 print(
"##################################################################")
559 print(
"## End of job.")
560 print(
"##################################################################\n")
567 if __name__ ==
"__main__":
569 if (len(sys.argv) != 2)
and (
not sys.argv[1].startswith(
'--argJSON=')) :
570 print(
"Input format wrong --- use ")
571 print(
" --argJSON=<json-dictionary containing input info> ")
572 print(
" with key/value pairs: ")
573 print(
" 1) 'inputHistFiles': python list ")
574 print(
" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
575 print(
" or list of file dictionaries ")
576 print(
" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
577 print(
" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
578 print(
" 2) 'outputHistFile': string 'datasetname#filename' ")
579 print(
" (HIST output dataset name + file) ")
580 print(
" optional parameters: ")
581 print(
" 3) 'incrementalMode': string ('True'/'False') ")
582 print(
" ('True': do incremental update of DQM webpages on top of existing statistics; ")
583 print(
" 'False': create final DQM webpages, replace temporary ones) ")
584 print(
" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
585 print(
" ('False': run histogram merging and DQ assessment only; ")
586 print(
" 'True': run additional post-processing step (fitting, etc.)) ")
587 print(
" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
588 print(
" 6) 'runNumber': int ")
589 print(
" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
590 print(
" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
591 print(
" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
592 print(
" ('True': allow upload of defects to database; ")
593 print(
" 'False': do not upload defects to database)")
597 jsonfile = sys.argv[1][len(
'--argJSON='):]