55 import sys, string, os.path, os, pickle, time, pprint
56 from xmlrpc
import client
as xmlrpclib
60 from subprocess
import getstatusoutput
67 if os.path.isfile(fname) :
68 sz = os.path.getsize(fname)
79 import stomp, json, ssl
80 from DataQualityUtils
import stompconfig
81 dest=
'/topic/atlas.dqm.progress'
82 conn=stomp.Connection([(
'atlas-mb.cern.ch', 61013)])
83 conn.connect(wait=
True, **stompconfig.config())
95 'MsgType': ((
'' if isprod
else 'Development') +
96 (
'WebDisplayRunComplete' if not incr
else 'WebDisplayIncremental')),
101 conn.send(body=json.dumps(body), destination=dest,headers=headers,ack=
'auto')
107 md5summer=hashlib.md5()
108 if os.path.isfile(filename):
110 infil=
open(filename,
'rb')
118 print(
"md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
125 print(
"\n##################################################################")
126 print(
"## ATLAS Tier-0 Offline DQM Processing ##")
127 print(
"##################################################################\n")
129 print(
"\n##################################################################")
130 print(
"## STEP 1: creating file with list of root files ...")
131 print(
"##################################################################\n")
134 print(
"Using pickled file ", picklefile,
" for input parameters")
135 f =
open(picklefile,
'r')
136 parmap = pickle.load(f)
139 print(
"\nFull Tier-0 run options:\n")
140 pprint.pprint(parmap)
142 inputfilelist = parmap.get(
'inputHistFiles', [])
143 nfiles = len(inputfilelist)
144 histMergeCompressionLevel=parmap.get(
'histMergeCompressionLevel', 1)
145 histMergeDebugLevel=parmap.get(
'histMergeDebugLevel', 0)
148 dt =
int(time.time() - tstart)
150 acronym =
'TRF_NOINPUT'
151 txt =
'empty input file list'
152 reportmap = {
'prodsys': {
'trfCode': retcode,
153 'trfAcronym': acronym,
157 'more': {
'num1': 0,
'num2': dt,
'txt1': txt }
165 if isinstance(inputfilelist[0], str) :
166 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
167 for val
in inputfilelist :
168 histtmpflist.append(val.split(
'#')[1])
170 elif isinstance(inputfilelist[0], dict) :
171 histtmpdsname = inputfilelist[0][
'dsn']
172 for fdict
in inputfilelist :
173 histtmpflist.append(fdict[
'lfn'])
174 nevt = fdict.get(
'events', 0)
177 print(
"WARNING Can't get number of events from input pickle file")
180 f =
open(
'hist_merge.list',
'w')
182 for hf
in histtmpflist :
183 txtstr +=
"%s\n" % hf
187 cmd =
"cat hist_merge.list"
188 (s,o) = getstatusoutput(cmd)
189 print(
"\nContents of file hist_merge.list:\n")
193 print(
"\n##################################################################")
194 print(
"## STEP 2: determining job parameters...")
195 print(
"##################################################################\n")
198 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
199 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
200 amitag = histfile.split(
'.')[5]
204 incr = parmap.get(
'incrementalMode',
'False')
207 postproc = parmap.get(
'postProcessing',
'True')
210 allowCOOLUpload = parmap.get(
'allowCOOLUpload',
'True')
213 doWebDisplay = parmap.get(
'doWebDisplay',
'True')
216 productionMode = parmap.get(
'productionMode',
'True')
217 if productionMode !=
'True' and incr ==
'True':
218 print(
"Production mode is not True, turning off incremental mode")
222 filepaths = parmap.get(
'filepaths',
None)
223 if filepaths
and isinstance(filepaths, dict):
224 if 'basename' not in filepaths:
225 print(
"Improperly formed 'filepaths' (no 'basename')")
227 for evtclass
in (
'Collisions',
'Cosmics',
'HeavyIons'):
228 if evtclass
not in filepaths:
229 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
231 clinfo = filepaths[evtclass]
232 for timeclass
in (
'run',
'minutes10',
'minutes30'):
233 if timeclass
not in clinfo:
234 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
236 dqcenvvar =
'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
237 fpath = os.path.join(filepaths[
'basename'], clinfo[timeclass])
238 print(
"Setting %s = %s" % (dqcenvvar, fpath))
239 os.environ[dqcenvvar] = fpath
245 dqproject = histdsname.split(
'.')[0]
247 dqproject =
'data_test'
248 dqproject = parmap.get(
'projectTag', dqproject)
251 if 'runNumber' in parmap :
252 runnr = parmap[
'runNumber']
255 runnr =
int(histdsname.split(
'.')[1])
260 if 'streamName' in parmap :
261 stream = parmap[
'streamName']
264 stream = histdsname.split(
'.')[2]
266 stream =
'test_dummy'
270 if 'procNumber' in parmap :
271 procnumber = parmap[
'procNumber']
274 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
277 xmlrpcserver = xmlrpclib.ServerProxy(
'http://atlasdqm.cern.ch:8888')
278 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream,
'tier0')
281 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
283 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
284 time.sleep(20*2**n_xmlrpc_tries)
286 print(
"Job parameters:\n")
287 print(
" Run number: ", runnr)
288 print(
" Stream name: ", stream)
289 print(
" Processing pass: ", procnumber)
290 print(
" Incremental mode:", incr)
291 print(
" Post-processing: ", postproc)
292 print(
" COOL uploads: ", allowCOOLUpload)
293 print(
" Production mode: ", productionMode)
296 print(
"\n##################################################################")
297 print(
"## STEP 3: running histogram merging procedure ...")
298 print(
"##################################################################\n")
301 os.environ[
'DQPRODUCTION'] =
'1' if productionMode ==
'True' else '0'
302 os.environ[
'DQ_STREAM'] = stream
303 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
304 os.environ[
'COOLUPLOADS'] =
'1' if allowCOOLUpload ==
'True' and productionMode ==
'True' else '0'
305 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
307 if postproc ==
'True' :
309 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
311 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
313 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
315 print(
"Histogram merging command:\n")
317 print(
"\n##################################################################\n")
319 print(
"## ... logfile from DQHistogramMerge.py: ")
320 print(
"--------------------------------------------------------------------------------")
322 retcode1 = os.system(cmd)
323 print(
"--------------------------------------------------------------------------------")
325 dt1 =
int(t1 - tstart)
327 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
328 print(
"## ... elapsed time: ", dt1,
" sec")
331 if postproc ==
'True' and incr ==
'False':
332 print(
"\n##################################################################")
333 print(
"## STEP 3b: copying postprocessing output to AFS ...")
334 print(
"##################################################################\n")
336 cmd =
"python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
338 print(
"File move command:\n")
340 print(
"\n##################################################################\n")
342 print(
"## ... logfile from DQFileMove.py: ")
343 print(
"--------------------------------------------------------------------------------")
345 retcode1b = os.system(cmd)
346 print(
"--------------------------------------------------------------------------------")
351 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
352 print(
"## ... elapsed time: ", dt1b,
" sec")
354 if doWebDisplay ==
'True':
355 print(
"\n##################################################################")
356 print(
"## STEP 4: running web-display creation procedure ...")
357 print(
"##################################################################\n")
359 cmd =
"python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
361 print(
"Web display creation command:\n")
363 print(
"\n##################################################################\n")
365 print(
"## ... logfile from DQWebDisplay.py: ")
366 print(
"--------------------------------------------------------------------------------")
368 retcode2 = os.system(cmd)
369 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
370 print(
"--------------------------------------------------------------------------------")
374 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
375 print(
"## ... elapsed time: ", dt2,
" sec")
377 print(
"\n##################################################################")
378 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
379 print(
"##################################################################\n")
383 print(
"\n##################################################################")
384 print(
"## STEP 5: finishing the job ...")
385 print(
"##################################################################\n")
393 txt =
'trf finished OK'
398 if (retcode2 >> 8)
in (0, 5) :
400 histmap =
getFileMap(histfile, histdsname, nevts=nevts)
403 if doWebDisplay ==
'True':
404 print(
'Publishing to message service')
405 publish_success_to_mq(runnr, dqproject, stream, incr=(incr==
'True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode==
'True'))
407 print(
'Web display off, not publishing to message service')
409 txt =
'DQWebDisplay.py execution problem'
410 print(
"ERROR: DQWebDisplay.py execution problem!")
412 acronym =
'TRF_DQMDISPLAY_EXE'
414 infilelist=
open(
'hist_merge.list',
'r')
415 for infname
in infilelist:
421 print(
"ERROR: DQHistogramMerge.py execution problem!")
423 acronym =
'TRF_DQMHISTMERGE_EXE'
425 txt =
'DQHistogramMerge.py execution problem'
427 infilelist=
open(
'hist_merge.list',
'r')
428 for infname
in infilelist:
433 DQResFile=
"DQResourceUtilization.txt"
434 if os.path.exists(DQResFile):
435 print(
"dumping resource utilization log")
436 with open(DQResFile)
as resfile:
437 for resline
in resfile:
438 print(resline, end=
' ')
441 reportmap = {
'prodsys': {
'trfCode': retcode,
442 'trfAcronym': acronym,
443 'jobOutputs': outfiles,
444 'jobInputs': infiles,
445 'nevents':
int(nevts),
446 'more': {
'num1':
int(nevts),
'num2':
int(dt),
'txt1': txt }
451 f =
open(
'jobReport.gpickle',
'w')
452 pickle.dump(reportmap, f)
455 print(
"\n## ... job finished with retcode : %s" % reportmap[
'prodsys'][
'trfCode'])
456 print(
"## ... error acronym: ", reportmap[
'prodsys'][
'trfAcronym'])
457 print(
"## ... elapsed time: ", reportmap[
'prodsys'][
'more'][
'num2'],
"sec")
459 print(
"##################################################################")
460 print(
"## End of job.")
461 print(
"##################################################################\n")
468 if __name__ ==
"__main__":
470 if (len(sys.argv) != 2)
and (
not sys.argv[1].startswith(
'--argdict=')) :
471 print(
"Input format wrong --- use ")
472 print(
" --argdict=<pickled-dictionary containing input info> ")
473 print(
" with key/value pairs: ")
474 print(
" 1) 'inputHistFiles': python list ")
475 print(
" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
476 print(
" or list of file dictionaries ")
477 print(
" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
478 print(
" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
479 print(
" 2) 'outputHistFile': string 'datasetname#filename' ")
480 print(
" (HIST output dataset name + file) ")
481 print(
" optional parameters: ")
482 print(
" 3) 'incrementalMode': string ('True'/'False') ")
483 print(
" ('True': do incremental update of DQM webpages on top of existing statistics; ")
484 print(
" 'False': create final DQM webpages, replace temporary ones) ")
485 print(
" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
486 print(
" ('False': run histogram merging and DQ assessment only; ")
487 print(
" 'True': run additional post-processing step (fitting, etc.)) ")
488 print(
" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
489 print(
" 6) 'runNumber': int ")
490 print(
" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
491 print(
" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
492 print(
" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
493 print(
" ('True': allow upload of defects to database; ")
494 print(
" 'False': do not upload defects to database)")
498 picklefile = sys.argv[1][len(
'--argdict='):]