ATLAS Offline Software
DQM_Tier0Wrapper_tf.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
4 
5 
62 
63 import sys, os.path, os, json, time, pprint, traceback
64 from xmlrpc import client as xmlrpclib
65 import six
66 from subprocess import getstatusoutput
67 #sami
68 import hashlib
69 
70 
71 
72 # Utility function
73 
74 def getSubFileMap(fname, nevts=0) :
75  if os.path.isfile(fname) :
76  sz = os.path.getsize(fname)
77  map = { 'name': fname,
78  'file_size' : sz,
79  'nentries' : nevts,
80  }
81  else :
82  map = {}
83  return map
84 
85 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap):
86  import stomp, json
87  from DataQualityUtils import stompconfig
88  dest='/topic/atlas.dqm.progress'
89  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
90  conn.connect(wait=True, **stompconfig.config())
91 
92  eos_only = False
93 
94  servers = parmap.get('servers', 'False')
95  if servers == [] or servers == '':
96  eos_only = True
97 
98 
99  body = {
100  'run': run,
101  'project_tag': ptag,
102  'stream': stream,
103  'ami': ami,
104  'pass': procpass,
105  'hcfg': hcfg,
106  'eos_only': eos_only,
107  }
108  headers = {
109  'MsgClass':'DQ',
110  'MsgType': (('' if isprod else 'Development') +
111  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
112  'type':'textMessage',
113  'persistent': 'true',
114  'destination': dest,
115  }
116  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
117  conn.disconnect()
118 
119 
120 
121 def genmd5sum(filename):
122  md5summer=hashlib.md5()
123  if os.path.isfile(filename):
124  try:
125  with open(filename,'rb') as infil:
126  while True:
127  fs=infil.read(8192)
128  if not fs:
129  break
130  md5summer.update(fs)
131  except:
132  pass
133  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
134  return
135 
136 def dq_combined_trf(jsonfile, outmap):
137 
138  print("\n##################################################################")
139  print("## STEP 1: creating file with list of root files ...")
140  print("##################################################################\n")
141 
142  nfiles=0
143 
144  try:
145  # extract parameters from json file
146  print("Using json file ", jsonfile, " for input parameters")
147  f = open(jsonfile, 'r')
148  parmap = json.load(f)
149  f.close()
150 
151  print("\nFull Tier-0 run options:\n")
152  pprint.pprint(parmap)
153 
154  inputfilelist = parmap.get('inputHistFiles', [])
155  nfiles = len(inputfilelist)
156  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
157  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
158  except:
159  outmap['exitCode'] = 101
160  outmap['exitAcronym'] = 'TRF_NOINPUT'
161  outmap['exitMsg'] = 'Trouble reading json input dict.'
162  traceback.print_exc()
163  return
164 
165  if not nfiles : # problem with job definition or reading json file
166  outmap['exitCode'] = 102
167  outmap['exitAcronym'] = 'TRF_NOINPUT'
168  outmap['exitMsg'] = 'Empty input file list.'
169  return
170 
171  histtmpflist = []
172  histtmpdsname = ''
173  nevts = 0
174 
175  try:
176  if isinstance(inputfilelist[0], str) :
177  histtmpdsname = (inputfilelist[0]).split('#')[0]
178  for val in inputfilelist :
179  histtmpflist.append(val.split('#')[1])
180 
181  elif isinstance(inputfilelist[0], dict) :
182  histtmpdsname = inputfilelist[0]['dsn']
183  for fdict in inputfilelist :
184  histtmpflist.append(fdict['lfn'])
185  nevt = fdict.get('events', 0)
186  if nevt is None:
187  nevt=0
188  print("WARNING Can't get number of events from input json file")
189  nevts+=nevt
190 
191  f = open('hist_merge.list', 'w')
192  txtstr = ""
193  for hf in histtmpflist :
194  txtstr += "%s\n" % hf
195  f.write(txtstr)
196  f.close()
197 
198  cmd = "cat hist_merge.list"
199  (s,o) = getstatusoutput(cmd)
200  print("\nContents of file hist_merge.list:\n")
201  print(o)
202  except:
203  outmap['exitCode'] = 103
204  outmap['exitAcronym'] = 'TRF_INPUTINFO'
205  outmap['exitMsg'] = 'ERROR: crash in assembling input file list (STEP 1)'
206  traceback.print_exc()
207  return
208 
209  try:
210  print("\n##################################################################")
211  print("## STEP 2: determining job parameters...")
212  print("##################################################################\n")
213 
214  skipMerge = parmap.get('skipMerge', 'False')
215  if skipMerge == 'True':
216  if nfiles != 1:
217  print("ERROR: skipMerge specified but something other than one input file specified")
218  outmap['exitCode'] = 108
219  outmap['exitAcronym'] = 'TRF_INPUTINFO'
220  outmap['exitMsg'] = 'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
221  return
222  histdsname = histtmpdsname
223  histfile = histtmpflist[0]
224  else:
225  # normal output file determination
226  histdsname = (parmap['outputHistFile']).split('#')[0]
227  histfile = (parmap['outputHistFile']).split('#')[1]
228  amitag = histdsname.split('.')[5]
229 
230 
231  # incremental mode on/off
232  incr = parmap.get('incrementalMode', 'False')
233 
234  # post-processing on/off
235  postproc = parmap.get('postProcessing', 'True')
236 
237  # database uploading on/off
238  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
239 
240  # do web display
241  doWebDisplay = parmap.get('doWebDisplay', 'True')
242 
243  # extra parameters for merge
244  mergeParams = parmap.get('mergeParams', '')
245 
246  # production mode
247  productionMode = parmap.get('productionMode', 'True')
248  if productionMode != 'True' and incr == 'True':
249  print("Production mode is not True, turning off incremental mode")
250  incr = 'False'
251 
252  # server list
253  servers = parmap.get('servers', 'False')
254  os.environ['DQC_SERVERS'] = servers
255 
256  # get file paths, put into environment vars
257  filepaths = parmap.get('filepaths', None)
258  if filepaths and isinstance(filepaths, dict):
259  if 'basename' not in filepaths:
260  print("Improperly formed 'filepaths' (no 'basename')")
261  else:
262  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
263  if evtclass not in filepaths:
264  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
265  else:
266  clinfo = filepaths[evtclass]
267  for timeclass in ('run', 'minutes10', 'minutes30'):
268  if timeclass not in clinfo:
269  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
270  else:
271  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
272  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
273  print("Setting %s = %s" % (dqcenvvar, fpath))
274  os.environ[dqcenvvar] = fpath
275 
276  # extract info from dataset name
277  # AMI project name
278  # override if tag has been specified in parmap
279  try :
280  dqproject = histdsname.split('.')[0]
281  except :
282  dqproject = 'data_test'
283  dqproject = parmap.get('projectTag', dqproject)
284 
285  # run number
286  if 'runNumber' in parmap :
287  runnr = parmap['runNumber']
288  else :
289  try :
290  runnr = int(histdsname.split('.')[1])
291  except :
292  runnr = 1234567890
293 
294  # stream name
295  if 'streamName' in parmap :
296  stream = parmap['streamName']
297  else :
298  try :
299  stream = histdsname.split('.')[2]
300  except :
301  stream = 'test_dummy'
302 
303  # processing pass number
304  procnumber = 99
305  MAX_XMLRPC_TRIES = 5
306  if 'procNumber' in parmap :
307  procnumber = parmap['procNumber']
308  else :
309  n_xmlrpc_tries = 1
310  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
311  procnumber = 99
312  try :
313  xmlrpcserver = xmlrpclib.ServerProxy('http://proc-pass-atlasdqm.app.cern.ch')
314  #xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888') ##old server
315  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
316  break
317  except :
318  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
319  n_xmlrpc_tries += 1
320  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
321  time.sleep(20*2**n_xmlrpc_tries)
322 
323 
324  print("Job parameters:\n")
325  print(" Run number: ", runnr)
326  print(" Stream name: ", stream)
327  print(" Processing pass: ", procnumber)
328  print(" Incremental mode:", incr)
329  print(" Post-processing: ", postproc)
330  print(" COOL uploads: ", allowCOOLUpload)
331  print(" Production mode: ", productionMode)
332  print(" Skip merge: ", skipMerge)
333  print(" Merge parameters:", mergeParams)
334  if servers == []:
335  print(" EOS only: True")
336 
337  except:
338  outmap['exitCode'] = 104
339  outmap['exitAcronym'] = 'TRF_JOBPARS'
340  outmap['exitMsg'] = 'Error in determining job parameters (STEP 2).'
341  traceback.print_exc()
342  return
343 
344  # environment setting
345  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
346  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
347  os.environ['DQ_STREAM'] = stream
348  print("Setting env variable DQ_STREAM to %s\n" % os.environ['DQ_STREAM'])
349  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
350  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
351 
352  if skipMerge != 'True':
353  try:
354  print("\n##################################################################")
355  print("## STEP 3: running histogram merging procedure ...")
356  print("##################################################################\n")
357 
358 
359  if postproc == 'True' :
360  if incr == 'True':
361  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
362  else:
363  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
364  else :
365  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
366 
367  cmd += (f' {mergeParams}' if mergeParams else '')
368 
369  print("Histogram merging command:\n")
370  print(cmd)
371  print("\n##################################################################\n")
372 
373  print("## ... logfile from DQHistogramMerge.py: ")
374  print("--------------------------------------------------------------------------------")
375  tstart = time.time()
376  # execute command
377  retcode1 = os.system(cmd)
378  print("--------------------------------------------------------------------------------")
379  t1 = time.time()
380  dt1 = int(t1 - tstart)
381 
382  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
383  print("## ... elapsed time: ", dt1, " sec")
384 
385  if retcode1 != 0 :
386  outmap['exitCode'] = retcode1
387  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
388  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
389  print("ERROR: DQHistogramMerge.py execution problem!")
390  retcode = retcode1
391  txt = 'DQHistogramMerge.py execution problem'
392  try:
393  try:
394  with open('hist_merge.list','r') as infilelist:
395  for infname in infilelist:
396  genmd5sum(infname.rstrip(os.linesep))
397  except:
398  pass
399  genmd5sum(histfile)
400  DQResFile="DQResourceUtilization.txt"
401  if os.path.exists(DQResFile):
402  print("dumping resource utilization log")
403  with open(DQResFile) as resfile:
404  for resline in resfile:
405  print(resline, end=' ')
406  except:
407  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
408  traceback.print_exc()
409  print("ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
410  return
411 
412  if postproc == 'True' and incr == 'False':
413  print("\n##################################################################")
414  print("## STEP 3b: copying postprocessing output to AFS ...")
415  print("##################################################################\n")
416 
417  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
418 
419  print("File move command:\n")
420  print(cmd)
421  print("\n##################################################################\n")
422 
423  print("## ... logfile from DQFileMove.py: ")
424  print("--------------------------------------------------------------------------------")
425  # execute command
426  retcode1b = os.system(cmd)
427  print("--------------------------------------------------------------------------------")
428  t1b = time.time()
429  dt1b = int(t1b - t1)
430  t1 = t1b
431 
432  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
433  print("## ... elapsed time: ", dt1b, " sec")
434  except:
435  outmap['exitCode'] = 105
436  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
437  outmap['exitMsg'] = 'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
438  traceback.print_exc()
439  return
440  else:
441  print("\n##################################################################")
442  print("## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
443  print("##################################################################\n")
444  t1 = time.time()
445 
446  try:
447  retcode2 = 0
448  dt2 = 0
449  if doWebDisplay == 'True':
450  print("\n##################################################################")
451  print("## STEP 4: running web-display creation procedure ...")
452  print("##################################################################\n")
453 
454  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
455 
456  print("Web display creation command:\n")
457  print(cmd)
458  print("\n##################################################################\n")
459 
460  print("## ... logfile from DQWebDisplay.py: ")
461  print("--------------------------------------------------------------------------------")
462  # execute command
463  retcode2 = os.system(cmd)
464  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
465  print("--------------------------------------------------------------------------------")
466  t2 = time.time()
467  dt2 = int(t2 - t1)
468 
469  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
470  print("## ... elapsed time: ", dt2, " sec")
471  if not (retcode2 >> 8) in (0, 5) :
472  print("ERROR: DQWebDisplay.py execution problem!")
473  outmap['exitCode'] = retcode2
474  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
475  outmap['exitMsg'] = 'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
476  try:
477  with open('hist_merge.list','r') as infilelist:
478  for infname in infilelist:
479  genmd5sum(infname.rstrip(os.linesep))
480  except:
481  pass
482  genmd5sum(histfile)
483  return
484  if productionMode == 'True':
485  try:
486  print('Publishing to message service')
487  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'), parmap=parmap)
488  except:
489  outmap['exitCode'] = 106
490  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
491  outmap['exitMsg'] = 'ERROR: Failure in publishing info to messaging service (STEP 4).'
492  traceback.print_exc()
493  return
494  else:
495  print("\n##################################################################")
496  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
497  print("##################################################################\n")
498  print('Web display off, not publishing to message service')
499  except:
500  outmap['exitCode'] = 106
501  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
502  outmap['exitMsg'] = 'ERROR: Failure in web-display creation procedure (STEP 4).'
503  print('ERROR: Failure in web-display creation procedure (STEP 4).')
504  traceback.print_exc()
505  return
506 
507  print("\n##################################################################")
508  print("## STEP 5: finishing the job ...")
509  print("##################################################################\n")
510 
511  # get info for report json file
512  try:
513  outfiles = [getSubFileMap(histfile, nevts=nevts)]
514  # assemble job report map
515  outmap['files']['output'][0]['dataset'] = histdsname
516  outmap['files']['output'][0]['subFiles'] = outfiles
517  outmap['resource']['transform']['processedEvents'] = int(nevts)
518  return
519  except:
520  outmap['exitCode'] = 107
521  outmap['exitAcronym'] = 'TRF_JOBREPORT'
522  outmap['exitMsg'] = 'ERROR: in job report creation (STEP 5)'
523  print("ERROR: in job report creation (STEP 5) !")
524  traceback.print_exc()
525  return
526 
527 def dq_trf_wrapper(jsonfile):
528  print("\n##################################################################")
529  print("## ATLAS Tier-0 Offline DQM Processing ##")
530  print("##################################################################\n")
531 
532  outmap = { 'exitAcronym' : 'OK',
533  'exitCode' : 0,
534  'exitMsg' : 'trf finished OK',
535  'files' : { 'output' : [{ 'dataset' : '',
536  'subFiles' : [ {},
537  ]}
538  ] },
539  'resource' : { 'transform' : { 'processedEvents' : 0 } }
540  }
541 
542  # dq_combined_trf will update outmap
543  tstart = time.time()
544  dq_combined_trf(jsonfile, outmap)
545  outmap['resource']['transform']['wallTime'] = int(time.time() - tstart)
546 
547  # dump json report map
548  f = open('jobReport.json', 'w')
549  json.dump(outmap, f)
550  f.close()
551 
552  # summarize status
553  print("\n## ... job finished with retcode : %s" % outmap['exitCode'])
554  print("## ... error acronym: ", outmap['exitAcronym'])
555  print("## ... job status message: ", outmap['exitMsg'])
556  print("## ... elapsed time: ", outmap['resource']['transform']['wallTime'], "sec")
557  print("##")
558  print("##################################################################")
559  print("## End of job.")
560  print("##################################################################\n")
561 
562 
563 
566 
567 if __name__ == "__main__":
568 
569  if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argJSON=')) :
570  print("Input format wrong --- use ")
571  print(" --argJSON=<json-dictionary containing input info> ")
572  print(" with key/value pairs: ")
573  print(" 1) 'inputHistFiles': python list ")
574  print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
575  print(" or list of file dictionaries ")
576  print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
577  print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
578  print(" 2) 'outputHistFile': string 'datasetname#filename' ")
579  print(" (HIST output dataset name + file) ")
580  print(" optional parameters: ")
581  print(" 3) 'incrementalMode': string ('True'/'False') ")
582  print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
583  print(" 'False': create final DQM webpages, replace temporary ones) ")
584  print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
585  print(" ('False': run histogram merging and DQ assessment only; ")
586  print(" 'True': run additional post-processing step (fitting, etc.)) ")
587  print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
588  print(" 6) 'runNumber': int ")
589  print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
590  print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
591  print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
592  print(" ('True': allow upload of defects to database; ")
593  print(" 'False': do not upload defects to database)")
594  sys.exit(-1)
595 
596  else :
597  jsonfile = sys.argv[1][len('--argJSON='):]
598  dq_trf_wrapper(jsonfile)
DQM_Tier0Wrapper_tf.dq_combined_trf
def dq_combined_trf(jsonfile, outmap)
Definition: DQM_Tier0Wrapper_tf.py:136
DQM_Tier0Wrapper_tf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_tf.py:121
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
DQM_Tier0Wrapper_tf.dq_trf_wrapper
def dq_trf_wrapper(jsonfile)
Definition: DQM_Tier0Wrapper_tf.py:527
Trk::open
@ open
Definition: BinningType.h:40
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
DQM_Tier0Wrapper_tf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap)
Definition: DQM_Tier0Wrapper_tf.py:85
DQM_Tier0Wrapper_tf.getSubFileMap
def getSubFileMap(fname, nevts=0)
Definition: DQM_Tier0Wrapper_tf.py:74
Trk::split
@ split
Definition: LayerMaterialProperties.h:38