ATLAS Offline Software
DQM_Tier0Wrapper_tf.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4 
5 
62 
63 import sys, os.path, os, json, time, pprint, traceback
64 from xmlrpc import client as xmlrpclib
65 from subprocess import getstatusoutput
66 #sami
67 import hashlib
68 
69 
70 
71 # Utility function
72 
73 def getSubFileMap(fname, nevts=0) :
74  if os.path.isfile(fname) :
75  sz = os.path.getsize(fname)
76  map = { 'name': fname,
77  'file_size' : sz,
78  'nentries' : nevts,
79  }
80  else :
81  map = {}
82  return map
83 
84 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap):
85  import stomp, json
86  from DataQualityUtils import stompconfig
87  dest='/topic/atlas.dqm.progress'
88  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
89  conn.connect(wait=True, **stompconfig.config())
90 
91  eos_only = False
92 
93  servers = parmap.get('servers', 'False')
94  if servers == [] or servers == '':
95  eos_only = True
96 
97 
98  body = {
99  'run': run,
100  'project_tag': ptag,
101  'stream': stream,
102  'ami': ami,
103  'pass': procpass,
104  'hcfg': hcfg,
105  'eos_only': eos_only,
106  }
107  headers = {
108  'MsgClass':'DQ',
109  'MsgType': (('' if isprod else 'Development') +
110  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
111  'type':'textMessage',
112  'persistent': 'true',
113  'destination': dest,
114  }
115  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
116  conn.disconnect()
117 
118 
119 
120 def genmd5sum(filename):
121  md5summer=hashlib.md5()
122  if os.path.isfile(filename):
123  try:
124  with open(filename,'rb') as infil:
125  while True:
126  fs=infil.read(8192)
127  if not fs:
128  break
129  md5summer.update(fs)
130  except:
131  pass
132  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
133  return
134 
135 def dq_combined_trf(jsonfile, outmap):
136 
137  print("\n##################################################################")
138  print("## STEP 1: creating file with list of root files ...")
139  print("##################################################################\n")
140 
141  nfiles=0
142 
143  try:
144  # extract parameters from json file
145  print("Using json file ", jsonfile, " for input parameters")
146  f = open(jsonfile, 'r')
147  parmap = json.load(f)
148  f.close()
149 
150  print("\nFull Tier-0 run options:\n")
151  pprint.pprint(parmap)
152 
153  inputfilelist = parmap.get('inputHistFiles', [])
154  nfiles = len(inputfilelist)
155  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
156  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
157  except:
158  outmap['exitCode'] = 101
159  outmap['exitAcronym'] = 'TRF_NOINPUT'
160  outmap['exitMsg'] = 'Trouble reading json input dict.'
161  traceback.print_exc()
162  return
163 
164  if not nfiles : # problem with job definition or reading json file
165  outmap['exitCode'] = 102
166  outmap['exitAcronym'] = 'TRF_NOINPUT'
167  outmap['exitMsg'] = 'Empty input file list.'
168  return
169 
170  histtmpflist = []
171  histtmpdsname = ''
172  nevts = 0
173 
174  try:
175  if isinstance(inputfilelist[0], str) :
176  histtmpdsname = (inputfilelist[0]).split('#')[0]
177  for val in inputfilelist :
178  histtmpflist.append(val.split('#')[1])
179 
180  elif isinstance(inputfilelist[0], dict) :
181  histtmpdsname = inputfilelist[0]['dsn']
182  for fdict in inputfilelist :
183  histtmpflist.append(fdict['lfn'])
184  nevt = fdict.get('events', 0)
185  if nevt is None:
186  nevt=0
187  print("WARNING Can't get number of events from input json file")
188  nevts+=nevt
189 
190  f = open('hist_merge.list', 'w')
191  txtstr = ""
192  for hf in histtmpflist :
193  txtstr += "%s\n" % hf
194  f.write(txtstr)
195  f.close()
196 
197  cmd = "cat hist_merge.list"
198  (s,o) = getstatusoutput(cmd)
199  print("\nContents of file hist_merge.list:\n")
200  print(o)
201  except:
202  outmap['exitCode'] = 103
203  outmap['exitAcronym'] = 'TRF_INPUTINFO'
204  outmap['exitMsg'] = 'ERROR: crash in assembling input file list (STEP 1)'
205  traceback.print_exc()
206  return
207 
208  try:
209  print("\n##################################################################")
210  print("## STEP 2: determining job parameters...")
211  print("##################################################################\n")
212 
213  skipMerge = parmap.get('skipMerge', 'False')
214  if skipMerge == 'True':
215  if nfiles != 1:
216  print("ERROR: skipMerge specified but something other than one input file specified")
217  outmap['exitCode'] = 108
218  outmap['exitAcronym'] = 'TRF_INPUTINFO'
219  outmap['exitMsg'] = 'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
220  return
221  histdsname = histtmpdsname
222  histfile = histtmpflist[0]
223  else:
224  # normal output file determination
225  histdsname = (parmap['outputHistFile']).split('#')[0]
226  histfile = (parmap['outputHistFile']).split('#')[1]
227  amitag = histdsname.split('.')[5]
228 
229 
230  # incremental mode on/off
231  incr = parmap.get('incrementalMode', 'False')
232 
233  # post-processing on/off
234  postproc = parmap.get('postProcessing', 'True')
235 
236  # database uploading on/off
237  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
238 
239  # do web display
240  doWebDisplay = parmap.get('doWebDisplay', 'True')
241 
242  # extra parameters for merge
243  mergeParams = parmap.get('mergeParams', '')
244 
245  # production mode
246  productionMode = parmap.get('productionMode', 'True')
247  if productionMode != 'True' and incr == 'True':
248  print("Production mode is not True, turning off incremental mode")
249  incr = 'False'
250 
251  # server list
252  servers = parmap.get('servers', 'False')
253  os.environ['DQC_SERVERS'] = servers
254 
255  # get file paths, put into environment vars
256  filepaths = parmap.get('filepaths', None)
257  if filepaths and isinstance(filepaths, dict):
258  if 'basename' not in filepaths:
259  print("Improperly formed 'filepaths' (no 'basename')")
260  else:
261  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
262  if evtclass not in filepaths:
263  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
264  else:
265  clinfo = filepaths[evtclass]
266  for timeclass in ('run', 'minutes10', 'minutes30'):
267  if timeclass not in clinfo:
268  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
269  else:
270  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
271  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
272  print("Setting %s = %s" % (dqcenvvar, fpath))
273  os.environ[dqcenvvar] = fpath
274 
275  # extract info from dataset name
276  # AMI project name
277  # override if tag has been specified in parmap
278  try :
279  dqproject = histdsname.split('.')[0]
280  except :
281  dqproject = 'data_test'
282  dqproject = parmap.get('projectTag', dqproject)
283 
284  # run number
285  if 'runNumber' in parmap :
286  runnr = parmap['runNumber']
287  else :
288  try :
289  runnr = int(histdsname.split('.')[1])
290  except :
291  runnr = 1234567890
292 
293  # stream name
294  if 'streamName' in parmap :
295  stream = parmap['streamName']
296  else :
297  try :
298  stream = histdsname.split('.')[2]
299  except :
300  stream = 'test_dummy'
301 
302  # processing pass number
303  procnumber = 99
304  MAX_XMLRPC_TRIES = 5
305  if 'procNumber' in parmap :
306  procnumber = parmap['procNumber']
307  else :
308  n_xmlrpc_tries = 1
309  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
310  procnumber = 99
311  try :
312  xmlrpcserver = xmlrpclib.ServerProxy('http://proc-pass-atlasdqm.app.cern.ch')
313  #xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888') ##old server
314  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
315  break
316  except :
317  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
318  n_xmlrpc_tries += 1
319  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
320  time.sleep(20*2**n_xmlrpc_tries)
321 
322 
323  print("Job parameters:\n")
324  print(" Run number: ", runnr)
325  print(" Stream name: ", stream)
326  print(" Processing pass: ", procnumber)
327  print(" Incremental mode:", incr)
328  print(" Post-processing: ", postproc)
329  print(" COOL uploads: ", allowCOOLUpload)
330  print(" Production mode: ", productionMode)
331  print(" Skip merge: ", skipMerge)
332  print(" Merge parameters:", mergeParams)
333  if servers == []:
334  print(" EOS only: True")
335 
336  except:
337  outmap['exitCode'] = 104
338  outmap['exitAcronym'] = 'TRF_JOBPARS'
339  outmap['exitMsg'] = 'Error in determining job parameters (STEP 2).'
340  traceback.print_exc()
341  return
342 
343  # environment setting
344  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
345  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
346  os.environ['DQ_STREAM'] = stream
347  print("Setting env variable DQ_STREAM to %s\n" % os.environ['DQ_STREAM'])
348  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
349  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
350 
351  if skipMerge != 'True':
352  try:
353  print("\n##################################################################")
354  print("## STEP 3: running histogram merging procedure ...")
355  print("##################################################################\n")
356 
357 
358  if postproc == 'True' :
359  if incr == 'True':
360  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
361  else:
362  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
363  else :
364  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
365 
366  cmd += (f' {mergeParams}' if mergeParams else '')
367 
368  print("Histogram merging command:\n")
369  print(cmd)
370  print("\n##################################################################\n")
371 
372  print("## ... logfile from DQHistogramMerge.py: ")
373  print("--------------------------------------------------------------------------------")
374  tstart = time.time()
375  # execute command
376  retcode1 = os.system(cmd)
377  print("--------------------------------------------------------------------------------")
378  t1 = time.time()
379  dt1 = int(t1 - tstart)
380 
381  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
382  print("## ... elapsed time: ", dt1, " sec")
383 
384  if retcode1 != 0 :
385  outmap['exitCode'] = retcode1
386  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
387  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
388  print("ERROR: DQHistogramMerge.py execution problem!")
389  retcode = retcode1
390  txt = 'DQHistogramMerge.py execution problem'
391  try:
392  try:
393  with open('hist_merge.list','r') as infilelist:
394  for infname in infilelist:
395  genmd5sum(infname.rstrip(os.linesep))
396  except:
397  pass
398  genmd5sum(histfile)
399  DQResFile="DQResourceUtilization.txt"
400  if os.path.exists(DQResFile):
401  print("dumping resource utilization log")
402  with open(DQResFile) as resfile:
403  for resline in resfile:
404  print(resline, end=' ')
405  except:
406  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
407  traceback.print_exc()
408  print("ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
409  return
410 
411  if postproc == 'True' and incr == 'False':
412  print("\n##################################################################")
413  print("## STEP 3b: copying postprocessing output to AFS ...")
414  print("##################################################################\n")
415 
416  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
417 
418  print("File move command:\n")
419  print(cmd)
420  print("\n##################################################################\n")
421 
422  print("## ... logfile from DQFileMove.py: ")
423  print("--------------------------------------------------------------------------------")
424  # execute command
425  retcode1b = os.system(cmd)
426  print("--------------------------------------------------------------------------------")
427  t1b = time.time()
428  dt1b = int(t1b - t1)
429  t1 = t1b
430 
431  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
432  print("## ... elapsed time: ", dt1b, " sec")
433  except:
434  outmap['exitCode'] = 105
435  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
436  outmap['exitMsg'] = 'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
437  traceback.print_exc()
438  return
439  else:
440  print("\n##################################################################")
441  print("## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
442  print("##################################################################\n")
443  t1 = time.time()
444 
445  try:
446  retcode2 = 0
447  dt2 = 0
448  if doWebDisplay == 'True':
449  print("\n##################################################################")
450  print("## STEP 4: running web-display creation procedure ...")
451  print("##################################################################\n")
452 
453  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
454 
455  print("Web display creation command:\n")
456  print(cmd)
457  print("\n##################################################################\n")
458 
459  print("## ... logfile from DQWebDisplay.py: ")
460  print("--------------------------------------------------------------------------------")
461  # execute command
462  retcode2 = os.system(cmd)
463  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
464  print("--------------------------------------------------------------------------------")
465  t2 = time.time()
466  dt2 = int(t2 - t1)
467 
468  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
469  print("## ... elapsed time: ", dt2, " sec")
470  if not (retcode2 >> 8) in (0, 5) :
471  print("ERROR: DQWebDisplay.py execution problem!")
472  outmap['exitCode'] = retcode2
473  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
474  outmap['exitMsg'] = 'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
475  try:
476  with open('hist_merge.list','r') as infilelist:
477  for infname in infilelist:
478  genmd5sum(infname.rstrip(os.linesep))
479  except:
480  pass
481  genmd5sum(histfile)
482  return
483  if productionMode == 'True':
484  try:
485  print('Publishing to message service')
486  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'), parmap=parmap)
487  except:
488  outmap['exitCode'] = 106
489  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
490  outmap['exitMsg'] = 'ERROR: Failure in publishing info to messaging service (STEP 4).'
491  traceback.print_exc()
492  return
493  else:
494  print("\n##################################################################")
495  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
496  print("##################################################################\n")
497  print('Web display off, not publishing to message service')
498  except:
499  outmap['exitCode'] = 106
500  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
501  outmap['exitMsg'] = 'ERROR: Failure in web-display creation procedure (STEP 4).'
502  print('ERROR: Failure in web-display creation procedure (STEP 4).')
503  traceback.print_exc()
504  return
505 
506  print("\n##################################################################")
507  print("## STEP 5: finishing the job ...")
508  print("##################################################################\n")
509 
510  # get info for report json file
511  try:
512  outfiles = [getSubFileMap(histfile, nevts=nevts)]
513  # assemble job report map
514  outmap['files']['output'][0]['dataset'] = histdsname
515  outmap['files']['output'][0]['subFiles'] = outfiles
516  outmap['resource']['transform']['processedEvents'] = int(nevts)
517  return
518  except:
519  outmap['exitCode'] = 107
520  outmap['exitAcronym'] = 'TRF_JOBREPORT'
521  outmap['exitMsg'] = 'ERROR: in job report creation (STEP 5)'
522  print("ERROR: in job report creation (STEP 5) !")
523  traceback.print_exc()
524  return
525 
526 def dq_trf_wrapper(jsonfile):
527  print("\n##################################################################")
528  print("## ATLAS Tier-0 Offline DQM Processing ##")
529  print("##################################################################\n")
530 
531  outmap = { 'exitAcronym' : 'OK',
532  'exitCode' : 0,
533  'exitMsg' : 'trf finished OK',
534  'files' : { 'output' : [{ 'dataset' : '',
535  'subFiles' : [ {},
536  ]}
537  ] },
538  'resource' : { 'transform' : { 'processedEvents' : 0 } }
539  }
540 
541  # dq_combined_trf will update outmap
542  tstart = time.time()
543  dq_combined_trf(jsonfile, outmap)
544  outmap['resource']['transform']['wallTime'] = int(time.time() - tstart)
545 
546  # dump json report map
547  f = open('jobReport.json', 'w')
548  json.dump(outmap, f)
549  f.close()
550 
551  # summarize status
552  print("\n## ... job finished with retcode : %s" % outmap['exitCode'])
553  print("## ... error acronym: ", outmap['exitAcronym'])
554  print("## ... job status message: ", outmap['exitMsg'])
555  print("## ... elapsed time: ", outmap['resource']['transform']['wallTime'], "sec")
556  print("##")
557  print("##################################################################")
558  print("## End of job.")
559  print("##################################################################\n")
560 
561 
562 
565 
566 if __name__ == "__main__":
567 
568  if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argJSON=')) :
569  print("Input format wrong --- use ")
570  print(" --argJSON=<json-dictionary containing input info> ")
571  print(" with key/value pairs: ")
572  print(" 1) 'inputHistFiles': python list ")
573  print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
574  print(" or list of file dictionaries ")
575  print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
576  print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
577  print(" 2) 'outputHistFile': string 'datasetname#filename' ")
578  print(" (HIST output dataset name + file) ")
579  print(" optional parameters: ")
580  print(" 3) 'incrementalMode': string ('True'/'False') ")
581  print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
582  print(" 'False': create final DQM webpages, replace temporary ones) ")
583  print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
584  print(" ('False': run histogram merging and DQ assessment only; ")
585  print(" 'True': run additional post-processing step (fitting, etc.)) ")
586  print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
587  print(" 6) 'runNumber': int ")
588  print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
589  print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
590  print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
591  print(" ('True': allow upload of defects to database; ")
592  print(" 'False': do not upload defects to database)")
593  sys.exit(-1)
594 
595  else :
596  jsonfile = sys.argv[1][len('--argJSON='):]
597  dq_trf_wrapper(jsonfile)
DQM_Tier0Wrapper_tf.dq_combined_trf
def dq_combined_trf(jsonfile, outmap)
Definition: DQM_Tier0Wrapper_tf.py:135
DQM_Tier0Wrapper_tf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_tf.py:120
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
DQM_Tier0Wrapper_tf.dq_trf_wrapper
def dq_trf_wrapper(jsonfile)
Definition: DQM_Tier0Wrapper_tf.py:526
Trk::open
@ open
Definition: BinningType.h:40
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
DQM_Tier0Wrapper_tf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap)
Definition: DQM_Tier0Wrapper_tf.py:84
DQM_Tier0Wrapper_tf.getSubFileMap
def getSubFileMap(fname, nevts=0)
Definition: DQM_Tier0Wrapper_tf.py:73
Trk::split
@ split
Definition: LayerMaterialProperties.h:38