135def dq_combined_trf(jsonfile, outmap):
136
137 print(
"\n##################################################################")
138 print(
"## STEP 1: creating file with list of root files ...")
139 print(
"##################################################################\n")
140
141 nfiles=0
142
143 try:
144
145 print(
"Using json file ", jsonfile,
" for input parameters")
146 f = open(jsonfile, 'r')
147 parmap = json.load(f)
148 f.close()
149
150 print(
"\nFull Tier-0 run options:\n")
151 pprint.pprint(parmap)
152
153 inputfilelist = parmap.get('inputHistFiles', [])
154 nfiles = len(inputfilelist)
155 histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
156 histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
157 except:
158 outmap['exitCode'] = 101
159 outmap['exitAcronym'] = 'TRF_NOINPUT'
160 outmap['exitMsg'] = 'Trouble reading json input dict.'
161 traceback.print_exc()
162 return
163
164 if not nfiles :
165 outmap['exitCode'] = 102
166 outmap['exitAcronym'] = 'TRF_NOINPUT'
167 outmap['exitMsg'] = 'Empty input file list.'
168 return
169
170 histtmpflist = []
171 histtmpdsname = ''
172 nevts = 0
173
174 try:
175 if isinstance(inputfilelist[0], str) :
176 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
177 for val in inputfilelist :
178 histtmpflist.append(val.split('#')[1])
179
180 elif isinstance(inputfilelist[0], dict) :
181 histtmpdsname = inputfilelist[0]['dsn']
182 for fdict in inputfilelist :
183 histtmpflist.append(fdict['lfn'])
184 nevt = fdict.get('events', 0)
185 if nevt is None:
186 nevt=0
187 print(
"WARNING Can't get number of events from input json file")
188 nevts+=nevt
189
190 f = open('hist_merge.list', 'w')
191 txtstr = ""
192 for hf in histtmpflist :
193 txtstr += "%s\n" % hf
194 f.write(txtstr)
195 f.close()
196
197 cmd = "cat hist_merge.list"
198 (s,o) = getstatusoutput(cmd)
199 print(
"\nContents of file hist_merge.list:\n")
201 except:
202 outmap['exitCode'] = 103
203 outmap['exitAcronym'] = 'TRF_INPUTINFO'
204 outmap['exitMsg'] = 'ERROR: crash in assembling input file list (STEP 1)'
205 traceback.print_exc()
206 return
207
208 try:
209 print(
"\n##################################################################")
210 print(
"## STEP 2: determining job parameters...")
211 print(
"##################################################################\n")
212
213 skipMerge = parmap.get('skipMerge', 'False')
214 if skipMerge == 'True':
215 if nfiles != 1:
216 print(
"ERROR: skipMerge specified but something other than one input file specified")
217 outmap['exitCode'] = 108
218 outmap['exitAcronym'] = 'TRF_INPUTINFO'
219 outmap['exitMsg'] = 'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
220 return
221 histdsname = histtmpdsname
222 histfile = histtmpflist[0]
223 else:
224
225 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
226 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
227 amitag = histdsname.split('.')[5]
228
229
230
231 incr = parmap.get('incrementalMode', 'False')
232
233
234 postproc = parmap.get('postProcessing', 'True')
235
236
237 allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
238
239
240 doWebDisplay = parmap.get('doWebDisplay', 'True')
241
242
243 mergeParams = parmap.get('mergeParams', '')
244
245
246 productionMode = parmap.get('productionMode', 'True')
247 if productionMode != 'True' and incr == 'True':
248 print(
"Production mode is not True, turning off incremental mode")
249 incr = 'False'
250
251
252 servers = parmap.get('servers', 'False')
253 os.environ['DQC_SERVERS'] = servers
254
255
256 filepaths = parmap.get('filepaths', None)
257 if filepaths and isinstance(filepaths, dict):
258 if 'basename' not in filepaths:
259 print(
"Improperly formed 'filepaths' (no 'basename')")
260 else:
261 for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
262 if evtclass not in filepaths:
263 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
264 else:
265 clinfo = filepaths[evtclass]
266 for timeclass in ('run', 'minutes10', 'minutes30'):
267 if timeclass not in clinfo:
268 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
269 else:
270 dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
271 fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
272 print(
"Setting %s = %s" % (dqcenvvar, fpath))
273 os.environ[dqcenvvar] = fpath
274
275
276
277
278 try :
279 dqproject = histdsname.split('.')[0]
280 except :
281 dqproject = 'data_test'
282 dqproject = parmap.get('projectTag', dqproject)
283
284
285 if 'runNumber' in parmap :
286 runnr = parmap['runNumber']
287 else :
288 try :
289 runnr = int(histdsname.split('.')[1])
290 except :
291 runnr = 1234567890
292
293
294 if 'streamName' in parmap :
295 stream = parmap['streamName']
296 else :
297 try :
298 stream = histdsname.split('.')[2]
299 except :
300 stream = 'test_dummy'
301
302
303 procnumber = 99
304 MAX_XMLRPC_TRIES = 5
305 if 'procNumber' in parmap :
306 procnumber = parmap['procNumber']
307 else :
308 n_xmlrpc_tries = 1
309 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
310 procnumber = 99
311 try :
312 xmlrpcserver = xmlrpclib.ServerProxy('http://proc-pass-atlasdqm.app.cern.ch')
313
314 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
315 break
316 except :
317 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
318 n_xmlrpc_tries += 1
319 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
320 time.sleep(20*2**n_xmlrpc_tries)
321
322
323 print(
"Job parameters:\n")
324 print(
" Run number: ", runnr)
325 print(
" Stream name: ", stream)
326 print(
" Processing pass: ", procnumber)
327 print(
" Incremental mode:", incr)
328 print(
" Post-processing: ", postproc)
329 print(
" COOL uploads: ", allowCOOLUpload)
330 print(
" Production mode: ", productionMode)
331 print(
" Skip merge: ", skipMerge)
332 print(
" Merge parameters:", mergeParams)
333 if servers == []:
334 print(
" EOS only: True")
335
336 except:
337 outmap['exitCode'] = 104
338 outmap['exitAcronym'] = 'TRF_JOBPARS'
339 outmap['exitMsg'] = 'Error in determining job parameters (STEP 2).'
340 traceback.print_exc()
341 return
342
343
344 os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
345 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
346 os.environ['DQ_STREAM'] = stream
347 print(
"Setting env variable DQ_STREAM to %s\n" % os.environ[
'DQ_STREAM'])
348 os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
349 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
350
351 if skipMerge != 'True':
352 try:
353 print(
"\n##################################################################")
354 print(
"## STEP 3: running histogram merging procedure ...")
355 print(
"##################################################################\n")
356
357
358 if postproc == 'True' :
359 if incr == 'True':
360 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
361 else:
362 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
363 else :
364 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
365
366 cmd += (f' {mergeParams}' if mergeParams else '')
367
368 print(
"Histogram merging command:\n")
370 print(
"\n##################################################################\n")
371
372 print(
"## ... logfile from DQHistogramMerge.py: ")
373 print(
"--------------------------------------------------------------------------------")
374 tstart = time.time()
375
376 retcode1 = os.system(cmd)
377 print(
"--------------------------------------------------------------------------------")
378 t1 = time.time()
379 dt1 = int(t1 - tstart)
380
381 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
382 print(
"## ... elapsed time: ", dt1,
" sec")
383
384 if retcode1 != 0 :
385 outmap['exitCode'] = retcode1
386 outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
387 outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
388 print(
"ERROR: DQHistogramMerge.py execution problem!")
389 retcode = retcode1
390 txt = 'DQHistogramMerge.py execution problem'
391 try:
392 try:
393 with open('hist_merge.list','r') as infilelist:
394 for infname in infilelist:
395 genmd5sum(infname.rstrip(os.linesep))
396 except:
397 pass
398 genmd5sum(histfile)
399 DQResFile="DQResourceUtilization.txt"
400 if os.path.exists(DQResFile):
401 print(
"dumping resource utilization log")
402 with open(DQResFile) as resfile:
403 for resline in resfile:
404 print(resline, end=
' ')
405 except:
406 outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
407 traceback.print_exc()
408 print(
"ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
409 return
410
411 if postproc == 'True' and incr == 'False':
412 print(
"\n##################################################################")
413 print(
"## STEP 3b: copying postprocessing output to AFS ...")
414 print(
"##################################################################\n")
415
416 cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
417
418 print(
"File move command:\n")
420 print(
"\n##################################################################\n")
421
422 print(
"## ... logfile from DQFileMove.py: ")
423 print(
"--------------------------------------------------------------------------------")
424
425 retcode1b = os.system(cmd)
426 print(
"--------------------------------------------------------------------------------")
427 t1b = time.time()
428 dt1b = int(t1b - t1)
429 t1 = t1b
430
431 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
432 print(
"## ... elapsed time: ", dt1b,
" sec")
433 except:
434 outmap['exitCode'] = 105
435 outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
436 outmap['exitMsg'] = 'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
437 traceback.print_exc()
438 return
439 else:
440 print(
"\n##################################################################")
441 print(
"## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
442 print(
"##################################################################\n")
443 t1 = time.time()
444
445 try:
446 retcode2 = 0
447 dt2 = 0
448 if doWebDisplay == 'True':
449 print(
"\n##################################################################")
450 print(
"## STEP 4: running web-display creation procedure ...")
451 print(
"##################################################################\n")
452
453 cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
454
455 print(
"Web display creation command:\n")
457 print(
"\n##################################################################\n")
458
459 print(
"## ... logfile from DQWebDisplay.py: ")
460 print(
"--------------------------------------------------------------------------------")
461
462 retcode2 = os.system(cmd)
463 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
464 print(
"--------------------------------------------------------------------------------")
465 t2 = time.time()
466 dt2 = int(t2 - t1)
467
468 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
469 print(
"## ... elapsed time: ", dt2,
" sec")
470 if not (retcode2 >> 8) in (0, 5) :
471 print(
"ERROR: DQWebDisplay.py execution problem!")
472 outmap['exitCode'] = retcode2
473 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
474 outmap['exitMsg'] = 'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
475 try:
476 with open('hist_merge.list','r') as infilelist:
477 for infname in infilelist:
478 genmd5sum(infname.rstrip(os.linesep))
479 except:
480 pass
481 genmd5sum(histfile)
482 return
483 if productionMode == 'True':
484 try:
485 print(
'Publishing to message service')
486 publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'), parmap=parmap)
487 except:
488 outmap['exitCode'] = 106
489 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
490 outmap['exitMsg'] = 'ERROR: Failure in publishing info to messaging service (STEP 4).'
491 traceback.print_exc()
492 return
493 else:
494 print(
"\n##################################################################")
495 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
496 print(
"##################################################################\n")
497 print(
'Web display off, not publishing to message service')
498 except:
499 outmap['exitCode'] = 106
500 outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
501 outmap['exitMsg'] = 'ERROR: Failure in web-display creation procedure (STEP 4).'
502 print(
'ERROR: Failure in web-display creation procedure (STEP 4).')
503 traceback.print_exc()
504 return
505
506 print(
"\n##################################################################")
507 print(
"## STEP 5: finishing the job ...")
508 print(
"##################################################################\n")
509
510
511 try:
512 outfiles = [getSubFileMap(histfile, nevts=nevts)]
513
514 outmap['files']['output'][0]['dataset'] = histdsname
515 outmap['files']['output'][0]['subFiles'] = outfiles
516 outmap['resource']['transform']['processedEvents'] = int(nevts)
517 return
518 except:
519 outmap['exitCode'] = 107
520 outmap['exitAcronym'] = 'TRF_JOBREPORT'
521 outmap['exitMsg'] = 'ERROR: in job report creation (STEP 5)'
522 print(
"ERROR: in job report creation (STEP 5) !")
523 traceback.print_exc()
524 return
525
void print(char *figname, TCanvas *c1)
std::vector< std::string > split(const std::string &s, const std::string &t=":")