129 print(
"\n##################################################################")
130 print(
"## ATLAS Tier-0 Offline DQM Processing ##")
131 print(
"##################################################################\n")
133 print(
"\n##################################################################")
134 print(
"## STEP 1: creating file with list of root files ...")
135 print(
"##################################################################\n")
138 print(
"Using pickled file ", picklefile,
" for input parameters")
139 f =
open(picklefile,
'r')
140 parmap = pickle.load(f)
143 print(
"\nFull Tier-0 run options:\n")
144 pprint.pprint(parmap)
146 inputfilelist = parmap.get(
'inputHistFiles', [])
147 nfiles = len(inputfilelist)
148 histMergeCompressionLevel=parmap.get(
'histMergeCompressionLevel', 1)
149 histMergeDebugLevel=parmap.get(
'histMergeDebugLevel', 0)
152 dt =
int(time.time() - tstart)
154 acronym =
'TRF_NOINPUT'
155 txt =
'empty input file list'
156 reportmap = {
'prodsys': {
'trfCode': retcode,
157 'trfAcronym': acronym,
161 'more': {
'num1': 0,
'num2': dt,
'txt1': txt }
169 if isinstance(inputfilelist[0], str) :
170 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
171 for val
in inputfilelist :
172 histtmpflist.append(val.split(
'#')[1])
174 elif isinstance(inputfilelist[0], dict) :
175 histtmpdsname = inputfilelist[0][
'dsn']
176 for fdict
in inputfilelist :
177 histtmpflist.append(fdict[
'lfn'])
178 nevt = fdict.get(
'events', 0)
181 print(
"WARNING Can't get number of events from input pickle file")
184 f =
open(
'hist_merge.list',
'w')
186 for hf
in histtmpflist :
187 txtstr +=
"%s\n" % hf
191 cmd =
"cat hist_merge.list"
192 (s,o) = getstatusoutput(cmd)
193 print(
"\nContents of file hist_merge.list:\n")
197 print(
"\n##################################################################")
198 print(
"## STEP 2: determining job parameters...")
199 print(
"##################################################################\n")
202 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
203 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
204 amitag = histfile.split(
'.')[5]
208 incr = parmap.get(
'incrementalMode',
'False')
211 postproc = parmap.get(
'postProcessing',
'True')
214 allowCOOLUpload = parmap.get(
'allowCOOLUpload',
'True')
217 doWebDisplay = parmap.get(
'doWebDisplay',
'True')
220 productionMode = parmap.get(
'productionMode',
'True')
221 if productionMode !=
'True' and incr ==
'True':
222 print(
"Production mode is not True, turning off incremental mode")
226 filepaths = parmap.get(
'filepaths',
None)
227 if filepaths
and isinstance(filepaths, dict):
228 if 'basename' not in filepaths:
229 print(
"Improperly formed 'filepaths' (no 'basename')")
231 for evtclass
in (
'Collisions',
'Cosmics',
'HeavyIons'):
232 if evtclass
not in filepaths:
233 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
235 clinfo = filepaths[evtclass]
236 for timeclass
in (
'run',
'minutes10',
'minutes30'):
237 if timeclass
not in clinfo:
238 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
240 dqcenvvar =
'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
241 fpath = os.path.join(filepaths[
'basename'], clinfo[timeclass])
242 print(
"Setting %s = %s" % (dqcenvvar, fpath))
243 os.environ[dqcenvvar] = fpath
249 dqproject = histdsname.split(
'.')[0]
251 dqproject =
'data_test'
252 dqproject = parmap.get(
'projectTag', dqproject)
255 if 'runNumber' in parmap :
256 runnr = parmap[
'runNumber']
259 runnr =
int(histdsname.split(
'.')[1])
264 if 'streamName' in parmap :
265 stream = parmap[
'streamName']
268 stream = histdsname.split(
'.')[2]
270 stream =
'test_dummy'
274 if 'procNumber' in parmap :
275 procnumber = parmap[
'procNumber']
278 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
281 xmlrpcserver = xmlrpclib.ServerProxy(
'http://atlasdqm.cern.ch:8888')
282 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream,
'tier0')
285 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
287 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
288 time.sleep(20*2**n_xmlrpc_tries)
290 print(
"Job parameters:\n")
291 print(
" Run number: ", runnr)
292 print(
" Stream name: ", stream)
293 print(
" Processing pass: ", procnumber)
294 print(
" Incremental mode:", incr)
295 print(
" Post-processing: ", postproc)
296 print(
" COOL uploads: ", allowCOOLUpload)
297 print(
" Production mode: ", productionMode)
300 print(
"\n##################################################################")
301 print(
"## STEP 3: running histogram merging procedure ...")
302 print(
"##################################################################\n")
305 os.environ[
'DQPRODUCTION'] =
'1' if productionMode ==
'True' else '0'
306 os.environ[
'DQ_STREAM'] = stream
307 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
308 os.environ[
'COOLUPLOADS'] =
'1' if allowCOOLUpload ==
'True' and productionMode ==
'True' else '0'
309 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
311 if postproc ==
'True' :
313 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
315 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
317 cmd =
"python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
319 print(
"Histogram merging command:\n")
321 print(
"\n##################################################################\n")
323 print(
"## ... logfile from DQHistogramMerge.py: ")
324 print(
"--------------------------------------------------------------------------------")
326 retcode1 = os.system(cmd)
327 print(
"--------------------------------------------------------------------------------")
329 dt1 =
int(t1 - tstart)
331 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
332 print(
"## ... elapsed time: ", dt1,
" sec")
335 if postproc ==
'True' and incr ==
'False':
336 print(
"\n##################################################################")
337 print(
"## STEP 3b: copying postprocessing output to AFS ...")
338 print(
"##################################################################\n")
340 cmd =
"python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
342 print(
"File move command:\n")
344 print(
"\n##################################################################\n")
346 print(
"## ... logfile from DQFileMove.py: ")
347 print(
"--------------------------------------------------------------------------------")
349 retcode1b = os.system(cmd)
350 print(
"--------------------------------------------------------------------------------")
355 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
356 print(
"## ... elapsed time: ", dt1b,
" sec")
358 if doWebDisplay ==
'True':
359 print(
"\n##################################################################")
360 print(
"## STEP 4: running web-display creation procedure ...")
361 print(
"##################################################################\n")
363 cmd =
"python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
365 print(
"Web display creation command:\n")
367 print(
"\n##################################################################\n")
369 print(
"## ... logfile from DQWebDisplay.py: ")
370 print(
"--------------------------------------------------------------------------------")
372 retcode2 = os.system(cmd)
373 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
374 print(
"--------------------------------------------------------------------------------")
378 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
379 print(
"## ... elapsed time: ", dt2,
" sec")
381 print(
"\n##################################################################")
382 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
383 print(
"##################################################################\n")
387 print(
"\n##################################################################")
388 print(
"## STEP 5: finishing the job ...")
389 print(
"##################################################################\n")
397 txt =
'trf finished OK'
402 if (retcode2 >> 8)
in (0, 5) :
404 histmap =
getFileMap(histfile, histdsname, nevts=nevts)
407 if doWebDisplay ==
'True':
408 print(
'Publishing to message service')
409 publish_success_to_mq(runnr, dqproject, stream, incr=(incr==
'True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode==
'True'))
411 print(
'Web display off, not publishing to message service')
413 txt =
'DQWebDisplay.py execution problem'
414 print(
"ERROR: DQWebDisplay.py execution problem!")
416 acronym =
'TRF_DQMDISPLAY_EXE'
418 infilelist=
open(
'hist_merge.list',
'r')
419 for infname
in infilelist:
425 print(
"ERROR: DQHistogramMerge.py execution problem!")
427 acronym =
'TRF_DQMHISTMERGE_EXE'
429 txt =
'DQHistogramMerge.py execution problem'
431 infilelist=
open(
'hist_merge.list',
'r')
432 for infname
in infilelist:
437 DQResFile=
"DQResourceUtilization.txt"
438 if os.path.exists(DQResFile):
439 print(
"dumping resource utilization log")
440 with open(DQResFile)
as resfile:
441 for resline
in resfile:
442 print(resline, end=
' ')
445 reportmap = {
'prodsys': {
'trfCode': retcode,
446 'trfAcronym': acronym,
447 'jobOutputs': outfiles,
448 'jobInputs': infiles,
449 'nevents':
int(nevts),
450 'more': {
'num1':
int(nevts),
'num2':
int(dt),
'txt1': txt }
455 f =
open(
'jobReport.gpickle',
'w')
456 pickle.dump(reportmap, f)
459 print(
"\n## ... job finished with retcode : %s" % reportmap[
'prodsys'][
'trfCode'])
460 print(
"## ... error acronym: ", reportmap[
'prodsys'][
'trfAcronym'])
461 print(
"## ... elapsed time: ", reportmap[
'prodsys'][
'more'][
'num2'],
"sec")
463 print(
"##################################################################")
464 print(
"## End of job.")
465 print(
"##################################################################\n")