ATLAS Offline Software
DQM_Tier0Wrapper_trf.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4 
5 
54 
55 import sys, string, os.path, os, pickle, time, pprint
56 from xmlrpc import client as xmlrpclib
57 #sami
58 import hashlib
59 from subprocess import getstatusoutput
60 
61 
62 
63 # Utility function
64 
65 def getFileMap(fname, dsname, nevts=0) :
66  if os.path.isfile(fname) :
67  sz = os.path.getsize(fname)
68  map = { 'lfn': fname,
69  'dataset' : dsname,
70  'size' : sz,
71  'events' : nevts
72  }
73  else :
74  map = {}
75  return map
76 
77 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod):
78  import stomp, json, ssl
79  from DataQualityUtils import stompconfig
80  dest='/topic/atlas.dqm.progress'
81  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
82  conn.connect(wait=True, **stompconfig.config())
83 
84  body = {
85  'run': run,
86  'project_tag': ptag,
87  'stream': stream,
88  'ami': ami,
89  'pass': procpass,
90  'hcfg': hcfg,
91  }
92  headers = {
93  'MsgClass':'DQ',
94  'MsgType': (('' if isprod else 'Development') +
95  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
96  'type':'textMessage',
97  'persistent': 'true',
98  'destination': dest,
99  }
100  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
101  conn.disconnect()
102 
103 
104 
105 def genmd5sum(filename):
106  md5summer=hashlib.md5()
107  if os.path.isfile(filename):
108  try:
109  infil=open(filename,'rb')
110  while True:
111  fs=infil.read(8192)
112  if not fs:
113  break
114  md5summer.update(fs)
115  finally:
116  infil.close()
117  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
118  return
119 
120 def dq_combined_trf(picklefile):
121 
122  tstart = time.time()
123 
124  print("\n##################################################################")
125  print("## ATLAS Tier-0 Offline DQM Processing ##")
126  print("##################################################################\n")
127 
128  print("\n##################################################################")
129  print("## STEP 1: creating file with list of root files ...")
130  print("##################################################################\n")
131 
132  # extract parameters from pickle file
133  print("Using pickled file ", picklefile, " for input parameters")
134  f = open(picklefile, 'r')
135  parmap = pickle.load(f)
136  f.close()
137 
138  print("\nFull Tier-0 run options:\n")
139  pprint.pprint(parmap)
140 
141  inputfilelist = parmap.get('inputHistFiles', [])
142  nfiles = len(inputfilelist)
143  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
144  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
145 
146  if not nfiles : # problem with job definition or reading pickle file
147  dt = int(time.time() - tstart)
148  retcode = 1
149  acronym = 'TRF_NOINPUT'
150  txt = 'empty input file list'
151  reportmap = { 'prodsys': { 'trfCode': retcode,
152  'trfAcronym': acronym,
153  'jobOutputs': [],
154  'jobInputs': [],
155  'nevents': 0,
156  'more': { 'num1': 0, 'num2': dt, 'txt1': txt }
157  }
158  }
159 
160  else :
161  histtmpflist = []
162  nevts = 0
163 
164  if isinstance(inputfilelist[0], str) :
165  histtmpdsname = (inputfilelist[0]).split('#')[0]
166  for val in inputfilelist :
167  histtmpflist.append(val.split('#')[1])
168 
169  elif isinstance(inputfilelist[0], dict) :
170  histtmpdsname = inputfilelist[0]['dsn']
171  for fdict in inputfilelist :
172  histtmpflist.append(fdict['lfn'])
173  nevt = fdict.get('events', 0)
174  if nevt is None:
175  nevt=0
176  print("WARNING Can't get number of events from input pickle file")
177  nevts+=nevt
178 
179  f = open('hist_merge.list', 'w')
180  txtstr = ""
181  for hf in histtmpflist :
182  txtstr += "%s\n" % hf
183  f.write(txtstr)
184  f.close()
185 
186  cmd = "cat hist_merge.list"
187  (s,o) = getstatusoutput(cmd)
188  print("\nContents of file hist_merge.list:\n")
189  print(o)
190 
191 
192  print("\n##################################################################")
193  print("## STEP 2: determining job parameters...")
194  print("##################################################################\n")
195 
196  # output file
197  histdsname = (parmap['outputHistFile']).split('#')[0]
198  histfile = (parmap['outputHistFile']).split('#')[1]
199  amitag = histfile.split('.')[5]
200 
201 
202  # incremental mode on/off
203  incr = parmap.get('incrementalMode', 'False')
204 
205  # post-processing on/off
206  postproc = parmap.get('postProcessing', 'True')
207 
208  # database uploading on/off
209  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
210 
211  # do web display
212  doWebDisplay = parmap.get('doWebDisplay', 'True')
213 
214  # production mode
215  productionMode = parmap.get('productionMode', 'True')
216  if productionMode != 'True' and incr == 'True':
217  print("Production mode is not True, turning off incremental mode")
218  incr = 'False'
219 
220  # get file paths, put into environment vars
221  filepaths = parmap.get('filepaths', None)
222  if filepaths and isinstance(filepaths, dict):
223  if 'basename' not in filepaths:
224  print("Improperly formed 'filepaths' (no 'basename')")
225  else:
226  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
227  if evtclass not in filepaths:
228  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
229  else:
230  clinfo = filepaths[evtclass]
231  for timeclass in ('run', 'minutes10', 'minutes30'):
232  if timeclass not in clinfo:
233  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
234  else:
235  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
236  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
237  print("Setting %s = %s" % (dqcenvvar, fpath))
238  os.environ[dqcenvvar] = fpath
239 
240  # extract info from dataset name
241  # AMI project name
242  # override if tag has been specified in parmap
243  try :
244  dqproject = histdsname.split('.')[0]
245  except :
246  dqproject = 'data_test'
247  dqproject = parmap.get('projectTag', dqproject)
248 
249  # run number
250  if 'runNumber' in parmap :
251  runnr = parmap['runNumber']
252  else :
253  try :
254  runnr = int(histdsname.split('.')[1])
255  except :
256  runnr = 1234567890
257 
258  # stream name
259  if 'streamName' in parmap :
260  stream = parmap['streamName']
261  else :
262  try :
263  stream = histdsname.split('.')[2]
264  except :
265  stream = 'test_dummy'
266 
267  # processing pass number
268  MAX_XMLRPC_TRIES = 5
269  if 'procNumber' in parmap :
270  procnumber = parmap['procNumber']
271  else :
272  n_xmlrpc_tries = 1
273  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
274  procnumber = 99
275  try :
276  xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888')
277  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
278  break
279  except :
280  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
281  n_xmlrpc_tries += 1
282  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
283  time.sleep(20*2**n_xmlrpc_tries)
284 
285  print("Job parameters:\n")
286  print(" Run number: ", runnr)
287  print(" Stream name: ", stream)
288  print(" Processing pass: ", procnumber)
289  print(" Incremental mode:", incr)
290  print(" Post-processing: ", postproc)
291  print(" COOL uploads: ", allowCOOLUpload)
292  print(" Production mode: ", productionMode)
293 
294 
295  print("\n##################################################################")
296  print("## STEP 3: running histogram merging procedure ...")
297  print("##################################################################\n")
298 
299  # environment setting
300  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
301  os.environ['DQ_STREAM'] = stream
302  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
303  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
304  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
305 
306  if postproc == 'True' :
307  if incr == 'True':
308  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
309  else:
310  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
311  else :
312  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
313 
314  print("Histogram merging command:\n")
315  print(cmd)
316  print("\n##################################################################\n")
317 
318  print("## ... logfile from DQHistogramMerge.py: ")
319  print("--------------------------------------------------------------------------------")
320  # execute command
321  retcode1 = os.system(cmd)
322  print("--------------------------------------------------------------------------------")
323  t1 = time.time()
324  dt1 = int(t1 - tstart)
325 
326  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
327  print("## ... elapsed time: ", dt1, " sec")
328 
329  if retcode1 == 0 :
330  if postproc == 'True' and incr == 'False':
331  print("\n##################################################################")
332  print("## STEP 3b: copying postprocessing output to AFS ...")
333  print("##################################################################\n")
334 
335  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
336 
337  print("File move command:\n")
338  print(cmd)
339  print("\n##################################################################\n")
340 
341  print("## ... logfile from DQFileMove.py: ")
342  print("--------------------------------------------------------------------------------")
343  # execute command
344  retcode1b = os.system(cmd)
345  print("--------------------------------------------------------------------------------")
346  t1b = time.time()
347  dt1b = int(t1b - t1)
348  t1 = t1b
349 
350  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
351  print("## ... elapsed time: ", dt1b, " sec")
352 
353  if doWebDisplay == 'True':
354  print("\n##################################################################")
355  print("## STEP 4: running web-display creation procedure ...")
356  print("##################################################################\n")
357 
358  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
359 
360  print("Web display creation command:\n")
361  print(cmd)
362  print("\n##################################################################\n")
363 
364  print("## ... logfile from DQWebDisplay.py: ")
365  print("--------------------------------------------------------------------------------")
366  # execute command
367  retcode2 = os.system(cmd)
368  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
369  print("--------------------------------------------------------------------------------")
370  t2 = time.time()
371  dt2 = int(t2 - t1)
372 
373  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
374  print("## ... elapsed time: ", dt2, " sec")
375  else:
376  print("\n##################################################################")
377  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
378  print("##################################################################\n")
379  retcode2 = 0
380  dt2 = 0
381 
382  print("\n##################################################################")
383  print("## STEP 5: finishing the job ...")
384  print("##################################################################\n")
385 
386  # assemble report gpickle file
387  outfiles = []
388  infiles = []
389 
390  retcode = 0
391  acronym = 'OK'
392  txt = 'trf finished OK'
393 
394  # get info for report gpickle file
395  if retcode1 == 0 :
396  dt = dt1
397  if (retcode2 >> 8) in (0, 5) :
398  # if success, or if unable to acquire cache lock
399  histmap = getFileMap(histfile, histdsname, nevts=nevts)
400  outfiles = [histmap]
401  dt += dt2
402  if doWebDisplay == 'True':
403  print('Publishing to message service')
404  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'))
405  else:
406  print('Web display off, not publishing to message service')
407  else :
408  txt = 'DQWebDisplay.py execution problem'
409  print("ERROR: DQWebDisplay.py execution problem!")
410  retcode = retcode2
411  acronym = 'TRF_DQMDISPLAY_EXE'
412  try:
413  infilelist=open('hist_merge.list','r')
414  for infname in infilelist:
415  genmd5sum(infname.rstrip(os.linesep))
416  finally:
417  infilelist.close()
418  genmd5sum(histfile)
419  else :
420  print("ERROR: DQHistogramMerge.py execution problem!")
421  retcode = retcode1
422  acronym = 'TRF_DQMHISTMERGE_EXE'
423  dt = 0
424  txt = 'DQHistogramMerge.py execution problem'
425  try:
426  infilelist=open('hist_merge.list','r')
427  for infname in infilelist:
428  genmd5sum(infname.rstrip(os.linesep))
429  finally:
430  infilelist.close()
431  genmd5sum(histfile)
432  DQResFile="DQResourceUtilization.txt"
433  if os.path.exists(DQResFile):
434  print("dumping resource utilization log")
435  with open(DQResFile) as resfile:
436  for resline in resfile:
437  print(resline, end=' ')
438 
439  # assemble job report map
440  reportmap = { 'prodsys': { 'trfCode': retcode,
441  'trfAcronym': acronym,
442  'jobOutputs': outfiles,
443  'jobInputs': infiles,
444  'nevents': int(nevts),
445  'more': { 'num1': int(nevts), 'num2': int(dt), 'txt1': txt }
446  }
447  }
448 
449  # pickle report map
450  f = open('jobReport.gpickle', 'w')
451  pickle.dump(reportmap, f)
452  f.close()
453 
454  print("\n## ... job finished with retcode : %s" % reportmap['prodsys']['trfCode'])
455  print("## ... error acronym: ", reportmap['prodsys']['trfAcronym'])
456  print("## ... elapsed time: ", reportmap['prodsys']['more']['num2'], "sec")
457  print("##")
458  print("##################################################################")
459  print("## End of job.")
460  print("##################################################################\n")
461 
462 
463 
466 
467 if __name__ == "__main__":
468 
469  if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argdict=')) :
470  print("Input format wrong --- use ")
471  print(" --argdict=<pickled-dictionary containing input info> ")
472  print(" with key/value pairs: ")
473  print(" 1) 'inputHistFiles': python list ")
474  print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
475  print(" or list of file dictionaries ")
476  print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
477  print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
478  print(" 2) 'outputHistFile': string 'datasetname#filename' ")
479  print(" (HIST output dataset name + file) ")
480  print(" optional parameters: ")
481  print(" 3) 'incrementalMode': string ('True'/'False') ")
482  print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
483  print(" 'False': create final DQM webpages, replace temporary ones) ")
484  print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
485  print(" ('False': run histogram merging and DQ assessment only; ")
486  print(" 'True': run additional post-processing step (fitting, etc.)) ")
487  print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
488  print(" 6) 'runNumber': int ")
489  print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
490  print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
491  print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
492  print(" ('True': allow upload of defects to database; ")
493  print(" 'False': do not upload defects to database)")
494  sys.exit(-1)
495 
496  else :
497  picklefile = sys.argv[1][len('--argdict='):]
498  dq_combined_trf(picklefile)
DQM_Tier0Wrapper_trf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod)
Definition: DQM_Tier0Wrapper_trf.py:77
DQM_Tier0Wrapper_trf.getFileMap
def getFileMap(fname, dsname, nevts=0)
Definition: DQM_Tier0Wrapper_trf.py:65
DQM_Tier0Wrapper_trf.dq_combined_trf
def dq_combined_trf(picklefile)
Definition: DQM_Tier0Wrapper_trf.py:120
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
Trk::open
@ open
Definition: BinningType.h:40
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
DQM_Tier0Wrapper_trf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_trf.py:105