120def dq_combined_trf(picklefile):
121
122 tstart = time.time()
123
124 print(
"\n##################################################################")
125 print(
"## ATLAS Tier-0 Offline DQM Processing ##")
126 print(
"##################################################################\n")
127
128 print(
"\n##################################################################")
129 print(
"## STEP 1: creating file with list of root files ...")
130 print(
"##################################################################\n")
131
132
133 print(
"Using pickled file ", picklefile,
" for input parameters")
134 f = open(picklefile, 'r')
135 parmap = pickle.load(f)
136 f.close()
137
138 print(
"\nFull Tier-0 run options:\n")
139 pprint.pprint(parmap)
140
141 inputfilelist = parmap.get('inputHistFiles', [])
142 nfiles = len(inputfilelist)
143 histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
144 histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
145
146 if not nfiles :
147 dt = int(time.time() - tstart)
148 retcode = 1
149 acronym = 'TRF_NOINPUT'
150 txt = 'empty input file list'
151 reportmap = { 'prodsys': { 'trfCode': retcode,
152 'trfAcronym': acronym,
153 'jobOutputs': [],
154 'jobInputs': [],
155 'nevents': 0,
156 'more': { 'num1': 0, 'num2': dt, 'txt1': txt }
157 }
158 }
159
160 else :
161 histtmpflist = []
162 nevts = 0
163
164 if isinstance(inputfilelist[0], str) :
165 histtmpdsname = (inputfilelist[0]).
split(
'#')[0]
166 for val in inputfilelist :
167 histtmpflist.append(val.split('#')[1])
168
169 elif isinstance(inputfilelist[0], dict) :
170 histtmpdsname = inputfilelist[0]['dsn']
171 for fdict in inputfilelist :
172 histtmpflist.append(fdict['lfn'])
173 nevt = fdict.get('events', 0)
174 if nevt is None:
175 nevt=0
176 print(
"WARNING Can't get number of events from input pickle file")
177 nevts+=nevt
178
179 f = open('hist_merge.list', 'w')
180 txtstr = ""
181 for hf in histtmpflist :
182 txtstr += "%s\n" % hf
183 f.write(txtstr)
184 f.close()
185
186 cmd = "cat hist_merge.list"
187 (s,o) = getstatusoutput(cmd)
188 print(
"\nContents of file hist_merge.list:\n")
190
191
192 print(
"\n##################################################################")
193 print(
"## STEP 2: determining job parameters...")
194 print(
"##################################################################\n")
195
196
197 histdsname = (parmap[
'outputHistFile']).
split(
'#')[0]
198 histfile = (parmap[
'outputHistFile']).
split(
'#')[1]
199 amitag = histfile.split('.')[5]
200
201
202
203 incr = parmap.get('incrementalMode', 'False')
204
205
206 postproc = parmap.get('postProcessing', 'True')
207
208
209 allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
210
211
212 doWebDisplay = parmap.get('doWebDisplay', 'True')
213
214
215 productionMode = parmap.get('productionMode', 'True')
216 if productionMode != 'True' and incr == 'True':
217 print(
"Production mode is not True, turning off incremental mode")
218 incr = 'False'
219
220
221 filepaths = parmap.get('filepaths', None)
222 if filepaths and isinstance(filepaths, dict):
223 if 'basename' not in filepaths:
224 print(
"Improperly formed 'filepaths' (no 'basename')")
225 else:
226 for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
227 if evtclass not in filepaths:
228 print(
"Improperly formed 'filepaths' (no '%s')" % evtclass)
229 else:
230 clinfo = filepaths[evtclass]
231 for timeclass in ('run', 'minutes10', 'minutes30'):
232 if timeclass not in clinfo:
233 print(
"Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
234 else:
235 dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
236 fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
237 print(
"Setting %s = %s" % (dqcenvvar, fpath))
238 os.environ[dqcenvvar] = fpath
239
240
241
242
243 try :
244 dqproject = histdsname.split('.')[0]
245 except :
246 dqproject = 'data_test'
247 dqproject = parmap.get('projectTag', dqproject)
248
249
250 if 'runNumber' in parmap :
251 runnr = parmap['runNumber']
252 else :
253 try :
254 runnr = int(histdsname.split('.')[1])
255 except :
256 runnr = 1234567890
257
258
259 if 'streamName' in parmap :
260 stream = parmap['streamName']
261 else :
262 try :
263 stream = histdsname.split('.')[2]
264 except :
265 stream = 'test_dummy'
266
267
268 MAX_XMLRPC_TRIES = 5
269 if 'procNumber' in parmap :
270 procnumber = parmap['procNumber']
271 else :
272 n_xmlrpc_tries = 1
273 while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
274 procnumber = 99
275 try :
276 xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888')
277 procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
278 break
279 except :
280 print(
'Web service connection failed, attempt', n_xmlrpc_tries,
'of', MAX_XMLRPC_TRIES)
281 n_xmlrpc_tries += 1
282 if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
283 time.sleep(20*2**n_xmlrpc_tries)
284
285 print(
"Job parameters:\n")
286 print(
" Run number: ", runnr)
287 print(
" Stream name: ", stream)
288 print(
" Processing pass: ", procnumber)
289 print(
" Incremental mode:", incr)
290 print(
" Post-processing: ", postproc)
291 print(
" COOL uploads: ", allowCOOLUpload)
292 print(
" Production mode: ", productionMode)
293
294
295 print(
"\n##################################################################")
296 print(
"## STEP 3: running histogram merging procedure ...")
297 print(
"##################################################################\n")
298
299
300 os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
301 os.environ['DQ_STREAM'] = stream
302 print(
"Setting env variable DQPRODUCTION to %s\n" % os.environ[
'DQPRODUCTION'])
303 os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
304 print(
"Setting env variable COOLUPLOADS to %s\n" % os.environ[
'COOLUPLOADS'])
305
306 if postproc == 'True' :
307 if incr == 'True':
308 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
309 else:
310 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
311 else :
312 cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
313
314 print(
"Histogram merging command:\n")
316 print(
"\n##################################################################\n")
317
318 print(
"## ... logfile from DQHistogramMerge.py: ")
319 print(
"--------------------------------------------------------------------------------")
320
321 retcode1 = os.system(cmd)
322 print(
"--------------------------------------------------------------------------------")
323 t1 = time.time()
324 dt1 = int(t1 - tstart)
325
326 print(
"\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
327 print(
"## ... elapsed time: ", dt1,
" sec")
328
329 if retcode1 == 0 :
330 if postproc == 'True' and incr == 'False':
331 print(
"\n##################################################################")
332 print(
"## STEP 3b: copying postprocessing output to AFS ...")
333 print(
"##################################################################\n")
334
335 cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
336
337 print(
"File move command:\n")
339 print(
"\n##################################################################\n")
340
341 print(
"## ... logfile from DQFileMove.py: ")
342 print(
"--------------------------------------------------------------------------------")
343
344 retcode1b = os.system(cmd)
345 print(
"--------------------------------------------------------------------------------")
346 t1b = time.time()
347 dt1b = int(t1b - t1)
348 t1 = t1b
349
350 print(
"\n## DQFileMove.py finished with retcode = %s" % retcode1b)
351 print(
"## ... elapsed time: ", dt1b,
" sec")
352
353 if doWebDisplay == 'True':
354 print(
"\n##################################################################")
355 print(
"## STEP 4: running web-display creation procedure ...")
356 print(
"##################################################################\n")
357
358 cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
359
360 print(
"Web display creation command:\n")
362 print(
"\n##################################################################\n")
363
364 print(
"## ... logfile from DQWebDisplay.py: ")
365 print(
"--------------------------------------------------------------------------------")
366
367 retcode2 = os.system(cmd)
368 print(
'DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
369 print(
"--------------------------------------------------------------------------------")
370 t2 = time.time()
371 dt2 = int(t2 - t1)
372
373 print(
"\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
374 print(
"## ... elapsed time: ", dt2,
" sec")
375 else:
376 print(
"\n##################################################################")
377 print(
"## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
378 print(
"##################################################################\n")
379 retcode2 = 0
380 dt2 = 0
381
382 print(
"\n##################################################################")
383 print(
"## STEP 5: finishing the job ...")
384 print(
"##################################################################\n")
385
386
387 outfiles = []
388 infiles = []
389
390 retcode = 0
391 acronym = 'OK'
392 txt = 'trf finished OK'
393
394
395 if retcode1 == 0 :
396 dt = dt1
397 if (retcode2 >> 8) in (0, 5) :
398
399 histmap = getFileMap(histfile, histdsname, nevts=nevts)
400 outfiles = [histmap]
401 dt += dt2
402 if doWebDisplay == 'True':
403 print(
'Publishing to message service')
404 publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'))
405 else:
406 print(
'Web display off, not publishing to message service')
407 else :
408 txt = 'DQWebDisplay.py execution problem'
409 print(
"ERROR: DQWebDisplay.py execution problem!")
410 retcode = retcode2
411 acronym = 'TRF_DQMDISPLAY_EXE'
412 try:
413 infilelist=open('hist_merge.list','r')
414 for infname in infilelist:
415 genmd5sum(infname.rstrip(os.linesep))
416 finally:
417 infilelist.close()
418 genmd5sum(histfile)
419 else :
420 print(
"ERROR: DQHistogramMerge.py execution problem!")
421 retcode = retcode1
422 acronym = 'TRF_DQMHISTMERGE_EXE'
423 dt = 0
424 txt = 'DQHistogramMerge.py execution problem'
425 try:
426 infilelist=open('hist_merge.list','r')
427 for infname in infilelist:
428 genmd5sum(infname.rstrip(os.linesep))
429 finally:
430 infilelist.close()
431 genmd5sum(histfile)
432 DQResFile="DQResourceUtilization.txt"
433 if os.path.exists(DQResFile):
434 print(
"dumping resource utilization log")
435 with open(DQResFile) as resfile:
436 for resline in resfile:
437 print(resline, end=
' ')
438
439
440 reportmap = { 'prodsys': { 'trfCode': retcode,
441 'trfAcronym': acronym,
442 'jobOutputs': outfiles,
443 'jobInputs': infiles,
444 'nevents': int(nevts),
445 'more': { 'num1': int(nevts), 'num2': int(dt), 'txt1': txt }
446 }
447 }
448
449
450 f = open('jobReport.gpickle', 'w')
451 pickle.dump(reportmap, f)
452 f.close()
453
454 print(
"\n## ... job finished with retcode : %s" % reportmap[
'prodsys'][
'trfCode'])
455 print(
"## ... error acronym: ", reportmap[
'prodsys'][
'trfAcronym'])
456 print(
"## ... elapsed time: ", reportmap[
'prodsys'][
'more'][
'num2'],
"sec")
458 print(
"##################################################################")
459 print(
"## End of job.")
460 print(
"##################################################################\n")
461
462
void print(char *figname, TCanvas *c1)
std::vector< std::string > split(const std::string &s, const std::string &t=":")