142 print(
"\n##################################################################")
143 print(
"## STEP 1: creating file with list of root files ...")
144 print(
"##################################################################\n")
150 print(
"Using json file ", jsonfile,
" for input parameters")
151 f =
open(jsonfile,
'r')
152 parmap = json.load(f)
155 print(
"\nFull Tier-0 run options:\n")
156 pprint.pprint(parmap)
158 inputfilelist = parmap.get(
'inputHistFiles', [])
159 nfiles = len(inputfilelist)
160 histMergeCompressionLevel=parmap.get(
'histMergeCompressionLevel', 1)
161 histMergeDebugLevel=parmap.get(
'histMergeDebugLevel', 0)
163 outmap[
'exitCode'] = 101
164 outmap[
'exitAcronym'] =
'TRF_NOINPUT'
165 outmap[
'exitMsg'] =
'Trouble reading json input dict.'
166 traceback.print_exc()
170 outmap[
'exitCode'] = 102
171 outmap[
'exitAcronym'] =
'TRF_NOINPUT'
172 outmap[
'exitMsg'] =
'Empty input file list.'
180 if isinstance(inputfilelist[0], six.text_type) :
181 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
182 for val
in inputfilelist :
183 histtmpflist.append(val.split(
'#')[1])
185 elif isinstance(inputfilelist[0], dict) :
186 histtmpdsname = inputfilelist[0][
'dsn']
187 for fdict
in inputfilelist :
188 histtmpflist.append(fdict[
'lfn'])
189 nevt = fdict.get(
'events', 0)
192 print(
"WARNING Can't get number of events from input json file")
195 f =
open(
'hist_merge.list',
'w')
197 for hf
in histtmpflist :
198 txtstr +=
"%s\n" % hf
202 cmd =
"cat hist_merge.list"
203 (s,o) = getstatusoutput(cmd)
204 print(
"\nContents of file hist_merge.list:\n")
207 outmap[
'exitCode'] = 103
208 outmap[
'exitAcronym'] =
'TRF_INPUTINFO'
209 outmap[
'exitMsg'] =
'ERROR: crash in assembling input file list (STEP 1)'
210 traceback.print_exc()
214 print(
"\n##################################################################")
215 print(
"## STEP 2: determining job parameters...")
216 print(
"##################################################################\n")
218 skipMerge = parmap.get(
'skipMerge',
'False')
219 if skipMerge ==
'True':
221 print(
"ERROR: skipMerge specified but something other than one input file specified")
222 outmap[
'exitCode'] = 108
223 outmap[
'exitAcronym'] =
'TRF_INPUTINFO'
224 outmap[
'exitMsg'] =
'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
226 histdsname = histtmpdsname
227 histfile = histtmpflist[0]
230 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
231 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
232 amitag = histdsname.split(
'.')[5]
236 incr = parmap.get(
'incrementalMode',
'False')
239 postproc = parmap.get(
'postProcessing',
'True')
242 allowCOOLUpload = parmap.get(
'allowCOOLUpload',
'True')
245 doWebDisplay = parmap.get(
'doWebDisplay',
'True')
248 mergeParams = parmap.get(
'mergeParams',
'')
251 productionMode = parmap.get(
'productionMode',
'True')
252 if productionMode !=
'True' and incr ==
'True':
253 print(
"Production mode is not True, turning off incremental mode")
257 servers = parmap.get(
'servers',
'False')
258 os.environ[
'DQC_SERVERS'] = servers
261 filepaths = parmap.get(
'filepaths',
None)
262 if filepaths
and isinstance(filepaths, dict):
263 if 'basename' not in filepaths:
264 print(
"Improperly formed 'filepaths' (no 'basename')")
266 for evtclass
in (
'Collisions',
'Cosmics',
'HeavyIons'):
267 if evtclass
not in filepaths:
268 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
270 clinfo = filepaths[evtclass]
271 for timeclass
in (
'run',
'minutes10',
'minutes30'):
272 if timeclass
not in clinfo:
273 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
275 dqcenvvar =
'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
276 fpath = os.path.join(filepaths[
'basename'], clinfo[timeclass])
277 print(
"Setting %s = %s" % (dqcenvvar, fpath))
278 os.environ[dqcenvvar] = fpath
284 dqproject = histdsname.split(
'.')[0]
286 dqproject =
'data_test'
287 dqproject = parmap.get(
'projectTag', dqproject)
290 if 'runNumber' in parmap :
291 runnr = parmap[
'runNumber']
294 runnr =
int(histdsname.split(
'.')[1])
299 if 'streamName' in parmap :
300 stream = parmap[
'streamName']
303 stream = histdsname.split(
'.')[2]
305 stream =
'test_dummy'
310 if 'procNumber' in parmap :
311 procnumber = parmap[
'procNumber']
314 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
317 xmlrpcserver = xmlrpclib.ServerProxy(
'http://proc-pass-atlasdqm.app.cern.ch')
319 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream,
'tier0')
322 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
324 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
325 time.sleep(20*2**n_xmlrpc_tries)
328 print(
"Job parameters:\n")
329 print(
" Run number: ", runnr)
330 print(
" Stream name: ", stream)
331 print(
" Processing pass: ", procnumber)
332 print(
" Incremental mode:", incr)
333 print(
" Post-processing: ", postproc)
334 print(
" COOL uploads: ", allowCOOLUpload)
335 print(
" Production mode: ", productionMode)
336 print(
" Skip merge: ", skipMerge)
337 print(
" Merge parameters:", mergeParams)
339 print(
" EOS only: True")
342 outmap[
'exitCode'] = 104
343 outmap[
'exitAcronym'] =
'TRF_JOBPARS'
344 outmap[
'exitMsg'] =
'Error in determining job parameters (STEP 2).'
345 traceback.print_exc()
349 os.environ[
'DQPRODUCTION'] =
'1' if productionMode ==
'True' else '0'
350 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
351 os.environ[
'DQ_STREAM'] = stream
352 print(
"Setting env variable DQ_STREAM to %s\n" % os.environ[
'DQ_STREAM'])
353 os.environ[
'COOLUPLOADS'] =
'1' if allowCOOLUpload ==
'True' and productionMode ==
'True' else '0'
354 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
356 if skipMerge !=
'True':
358 print(
"\n##################################################################")
359 print(
"## STEP 3: running histogram merging procedure ...")
360 print(
"##################################################################\n")
363 if postproc ==
'True' :
365 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
367 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
369 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
371 cmd += (f
' {mergeParams}' if mergeParams
else '')
373 print(
"Histogram merging command:\n")
375 print(
"\n##################################################################\n")
377 print(
"## ... logfile from DQHistogramMerge.py: ")
378 print(
"--------------------------------------------------------------------------------")
381 retcode1 = os.system(cmd)
382 print(
"--------------------------------------------------------------------------------")
384 dt1 =
int(t1 - tstart)
386 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
387 print(
"## ... elapsed time: ", dt1,
" sec")
390 outmap[
'exitCode'] = retcode1
391 outmap[
'exitAcronym'] =
'TRF_DQMHISTMERGE_EXE'
392 outmap[
'exitMsg'] =
'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
393 print(
"ERROR: DQHistogramMerge.py execution problem!")
395 txt =
'DQHistogramMerge.py execution problem'
398 with open(
'hist_merge.list',
'r')
as infilelist:
399 for infname
in infilelist:
404 DQResFile=
"DQResourceUtilization.txt"
405 if os.path.exists(DQResFile):
406 print(
"dumping resource utilization log")
407 with open(DQResFile)
as resfile:
408 for resline
in resfile:
409 print(resline, end=
' ')
411 outmap[
'exitMsg'] =
'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
412 traceback.print_exc()
413 print(
"ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
416 if postproc ==
'True' and incr ==
'False':
417 print(
"\n##################################################################")
418 print(
"## STEP 3b: copying postprocessing output to AFS ...")
419 print(
"##################################################################\n")
421 cmd =
"python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
423 print(
"File move command:\n")
425 print(
"\n##################################################################\n")
427 print(
"## ... logfile from DQFileMove.py: ")
428 print(
"--------------------------------------------------------------------------------")
430 retcode1b = os.system(cmd)
431 print(
"--------------------------------------------------------------------------------")
436 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
437 print(
"## ... elapsed time: ", dt1b,
" sec")
439 outmap[
'exitCode'] = 105
440 outmap[
'exitAcronym'] =
'TRF_DQMHISTMERGE_EXE'
441 outmap[
'exitMsg'] =
'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
442 traceback.print_exc()
445 print(
"\n##################################################################")
446 print(
"## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
447 print(
"##################################################################\n")
453 if doWebDisplay ==
'True':
454 print(
"\n##################################################################")
455 print(
"## STEP 4: running web-display creation procedure ...")
456 print(
"##################################################################\n")
458 cmd =
"python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
460 print(
"Web display creation command:\n")
462 print(
"\n##################################################################\n")
464 print(
"## ... logfile from DQWebDisplay.py: ")
465 print(
"--------------------------------------------------------------------------------")
467 retcode2 = os.system(cmd)
468 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
469 print(
"--------------------------------------------------------------------------------")
473 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
474 print(
"## ... elapsed time: ", dt2,
" sec")
475 if not (retcode2 >> 8)
in (0, 5) :
476 print(
"ERROR: DQWebDisplay.py execution problem!")
477 outmap[
'exitCode'] = retcode2
478 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
479 outmap[
'exitMsg'] =
'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
481 with open(
'hist_merge.list',
'r')
as infilelist:
482 for infname
in infilelist:
488 if productionMode ==
'True':
490 print(
'Publishing to message service')
491 publish_success_to_mq(runnr, dqproject, stream, incr=(incr==
'True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode==
'True'), parmap=parmap)
493 outmap[
'exitCode'] = 106
494 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
495 outmap[
'exitMsg'] =
'ERROR: Failure in publishing info to messaging service (STEP 4).'
496 traceback.print_exc()
499 print(
"\n##################################################################")
500 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
501 print(
"##################################################################\n")
502 print(
'Web display off, not publishing to message service')
504 outmap[
'exitCode'] = 106
505 outmap[
'exitAcronym'] =
'TRF_DQMDISPLAY_EXE'
506 outmap[
'exitMsg'] =
'ERROR: Failure in web-display creation procedure (STEP 4).'
507 print(
'ERROR: Failure in web-display creation procedure (STEP 4).')
508 traceback.print_exc()
511 print(
"\n##################################################################")
512 print(
"## STEP 5: finishing the job ...")
513 print(
"##################################################################\n")
519 outmap[
'files'][
'output'][0][
'dataset'] = histdsname
520 outmap[
'files'][
'output'][0][
'subFiles'] = outfiles
521 outmap[
'resource'][
'transform'][
'processedEvents'] =
int(nevts)
524 outmap[
'exitCode'] = 107
525 outmap[
'exitAcronym'] =
'TRF_JOBREPORT'
526 outmap[
'exitMsg'] =
'ERROR: in job report creation (STEP 5)'
527 print(
"ERROR: in job report creation (STEP 5) !")
528 traceback.print_exc()