ATLAS Offline Software
DQM_Tier0Wrapper_trf.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 
5 
54 
55 import sys, string, os.path, os, pickle, time, pprint
56 from xmlrpc import client as xmlrpclib
57 #sami
58 import hashlib
59 import six
60 from subprocess import getstatusoutput
61 
62 
63 
64 # Utility function
65 
66 def getFileMap(fname, dsname, nevts=0) :
67  if os.path.isfile(fname) :
68  sz = os.path.getsize(fname)
69  map = { 'lfn': fname,
70  'dataset' : dsname,
71  'size' : sz,
72  'events' : nevts
73  }
74  else :
75  map = {}
76  return map
77 
78 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod):
79  import stomp, json, ssl
80  from DataQualityUtils import stompconfig
81  dest='/topic/atlas.dqm.progress'
82  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
83  conn.connect(wait=True, **stompconfig.config())
84 
85  body = {
86  'run': run,
87  'project_tag': ptag,
88  'stream': stream,
89  'ami': ami,
90  'pass': procpass,
91  'hcfg': hcfg,
92  }
93  headers = {
94  'MsgClass':'DQ',
95  'MsgType': (('' if isprod else 'Development') +
96  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
97  'type':'textMessage',
98  'persistent': 'true',
99  'destination': dest,
100  }
101  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
102  conn.disconnect()
103 
104 
105 
106 def genmd5sum(filename):
107  md5summer=hashlib.md5()
108  if os.path.isfile(filename):
109  try:
110  infil=open(filename,'rb')
111  while True:
112  fs=infil.read(8192)
113  if not fs:
114  break
115  md5summer.update(fs)
116  finally:
117  infil.close()
118  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
119  return
120 
121 def dq_combined_trf(picklefile):
122 
123  tstart = time.time()
124 
125  print("\n##################################################################")
126  print("## ATLAS Tier-0 Offline DQM Processing ##")
127  print("##################################################################\n")
128 
129  print("\n##################################################################")
130  print("## STEP 1: creating file with list of root files ...")
131  print("##################################################################\n")
132 
133  # extract parameters from pickle file
134  print("Using pickled file ", picklefile, " for input parameters")
135  f = open(picklefile, 'r')
136  parmap = pickle.load(f)
137  f.close()
138 
139  print("\nFull Tier-0 run options:\n")
140  pprint.pprint(parmap)
141 
142  inputfilelist = parmap.get('inputHistFiles', [])
143  nfiles = len(inputfilelist)
144  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
145  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
146 
147  if not nfiles : # problem with job definition or reading pickle file
148  dt = int(time.time() - tstart)
149  retcode = 1
150  acronym = 'TRF_NOINPUT'
151  txt = 'empty input file list'
152  reportmap = { 'prodsys': { 'trfCode': retcode,
153  'trfAcronym': acronym,
154  'jobOutputs': [],
155  'jobInputs': [],
156  'nevents': 0,
157  'more': { 'num1': 0, 'num2': dt, 'txt1': txt }
158  }
159  }
160 
161  else :
162  histtmpflist = []
163  nevts = 0
164 
165  if isinstance(inputfilelist[0], str) :
166  histtmpdsname = (inputfilelist[0]).split('#')[0]
167  for val in inputfilelist :
168  histtmpflist.append(val.split('#')[1])
169 
170  elif isinstance(inputfilelist[0], dict) :
171  histtmpdsname = inputfilelist[0]['dsn']
172  for fdict in inputfilelist :
173  histtmpflist.append(fdict['lfn'])
174  nevt = fdict.get('events', 0)
175  if nevt is None:
176  nevt=0
177  print("WARNING Can't get number of events from input pickle file")
178  nevts+=nevt
179 
180  f = open('hist_merge.list', 'w')
181  txtstr = ""
182  for hf in histtmpflist :
183  txtstr += "%s\n" % hf
184  f.write(txtstr)
185  f.close()
186 
187  cmd = "cat hist_merge.list"
188  (s,o) = getstatusoutput(cmd)
189  print("\nContents of file hist_merge.list:\n")
190  print(o)
191 
192 
193  print("\n##################################################################")
194  print("## STEP 2: determining job parameters...")
195  print("##################################################################\n")
196 
197  # output file
198  histdsname = (parmap['outputHistFile']).split('#')[0]
199  histfile = (parmap['outputHistFile']).split('#')[1]
200  amitag = histfile.split('.')[5]
201 
202 
203  # incremental mode on/off
204  incr = parmap.get('incrementalMode', 'False')
205 
206  # post-processing on/off
207  postproc = parmap.get('postProcessing', 'True')
208 
209  # database uploading on/off
210  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
211 
212  # do web display
213  doWebDisplay = parmap.get('doWebDisplay', 'True')
214 
215  # production mode
216  productionMode = parmap.get('productionMode', 'True')
217  if productionMode != 'True' and incr == 'True':
218  print("Production mode is not True, turning off incremental mode")
219  incr = 'False'
220 
221  # get file paths, put into environment vars
222  filepaths = parmap.get('filepaths', None)
223  if filepaths and isinstance(filepaths, dict):
224  if 'basename' not in filepaths:
225  print("Improperly formed 'filepaths' (no 'basename')")
226  else:
227  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
228  if evtclass not in filepaths:
229  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
230  else:
231  clinfo = filepaths[evtclass]
232  for timeclass in ('run', 'minutes10', 'minutes30'):
233  if timeclass not in clinfo:
234  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
235  else:
236  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
237  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
238  print("Setting %s = %s" % (dqcenvvar, fpath))
239  os.environ[dqcenvvar] = fpath
240 
241  # extract info from dataset name
242  # AMI project name
243  # override if tag has been specified in parmap
244  try :
245  dqproject = histdsname.split('.')[0]
246  except :
247  dqproject = 'data_test'
248  dqproject = parmap.get('projectTag', dqproject)
249 
250  # run number
251  if 'runNumber' in parmap :
252  runnr = parmap['runNumber']
253  else :
254  try :
255  runnr = int(histdsname.split('.')[1])
256  except :
257  runnr = 1234567890
258 
259  # stream name
260  if 'streamName' in parmap :
261  stream = parmap['streamName']
262  else :
263  try :
264  stream = histdsname.split('.')[2]
265  except :
266  stream = 'test_dummy'
267 
268  # processing pass number
269  MAX_XMLRPC_TRIES = 5
270  if 'procNumber' in parmap :
271  procnumber = parmap['procNumber']
272  else :
273  n_xmlrpc_tries = 1
274  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
275  procnumber = 99
276  try :
277  xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888')
278  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
279  break
280  except :
281  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
282  n_xmlrpc_tries += 1
283  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
284  time.sleep(20*2**n_xmlrpc_tries)
285 
286  print("Job parameters:\n")
287  print(" Run number: ", runnr)
288  print(" Stream name: ", stream)
289  print(" Processing pass: ", procnumber)
290  print(" Incremental mode:", incr)
291  print(" Post-processing: ", postproc)
292  print(" COOL uploads: ", allowCOOLUpload)
293  print(" Production mode: ", productionMode)
294 
295 
296  print("\n##################################################################")
297  print("## STEP 3: running histogram merging procedure ...")
298  print("##################################################################\n")
299 
300  # environment setting
301  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
302  os.environ['DQ_STREAM'] = stream
303  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
304  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
305  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
306 
307  if postproc == 'True' :
308  if incr == 'True':
309  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
310  else:
311  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
312  else :
313  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
314 
315  print("Histogram merging command:\n")
316  print(cmd)
317  print("\n##################################################################\n")
318 
319  print("## ... logfile from DQHistogramMerge.py: ")
320  print("--------------------------------------------------------------------------------")
321  # execute command
322  retcode1 = os.system(cmd)
323  print("--------------------------------------------------------------------------------")
324  t1 = time.time()
325  dt1 = int(t1 - tstart)
326 
327  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
328  print("## ... elapsed time: ", dt1, " sec")
329 
330  if retcode1 == 0 :
331  if postproc == 'True' and incr == 'False':
332  print("\n##################################################################")
333  print("## STEP 3b: copying postprocessing output to AFS ...")
334  print("##################################################################\n")
335 
336  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
337 
338  print("File move command:\n")
339  print(cmd)
340  print("\n##################################################################\n")
341 
342  print("## ... logfile from DQFileMove.py: ")
343  print("--------------------------------------------------------------------------------")
344  # execute command
345  retcode1b = os.system(cmd)
346  print("--------------------------------------------------------------------------------")
347  t1b = time.time()
348  dt1b = int(t1b - t1)
349  t1 = t1b
350 
351  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
352  print("## ... elapsed time: ", dt1b, " sec")
353 
354  if doWebDisplay == 'True':
355  print("\n##################################################################")
356  print("## STEP 4: running web-display creation procedure ...")
357  print("##################################################################\n")
358 
359  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
360 
361  print("Web display creation command:\n")
362  print(cmd)
363  print("\n##################################################################\n")
364 
365  print("## ... logfile from DQWebDisplay.py: ")
366  print("--------------------------------------------------------------------------------")
367  # execute command
368  retcode2 = os.system(cmd)
369  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
370  print("--------------------------------------------------------------------------------")
371  t2 = time.time()
372  dt2 = int(t2 - t1)
373 
374  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
375  print("## ... elapsed time: ", dt2, " sec")
376  else:
377  print("\n##################################################################")
378  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
379  print("##################################################################\n")
380  retcode2 = 0
381  dt2 = 0
382 
383  print("\n##################################################################")
384  print("## STEP 5: finishing the job ...")
385  print("##################################################################\n")
386 
387  # assemble report gpickle file
388  outfiles = []
389  infiles = []
390 
391  retcode = 0
392  acronym = 'OK'
393  txt = 'trf finished OK'
394 
395  # get info for report gpickle file
396  if retcode1 == 0 :
397  dt = dt1
398  if (retcode2 >> 8) in (0, 5) :
399  # if success, or if unable to acquire cache lock
400  histmap = getFileMap(histfile, histdsname, nevts=nevts)
401  outfiles = [histmap]
402  dt += dt2
403  if doWebDisplay == 'True':
404  print('Publishing to message service')
405  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'))
406  else:
407  print('Web display off, not publishing to message service')
408  else :
409  txt = 'DQWebDisplay.py execution problem'
410  print("ERROR: DQWebDisplay.py execution problem!")
411  retcode = retcode2
412  acronym = 'TRF_DQMDISPLAY_EXE'
413  try:
414  infilelist=open('hist_merge.list','r')
415  for infname in infilelist:
416  genmd5sum(infname.rstrip(os.linesep))
417  finally:
418  infilelist.close()
419  genmd5sum(histfile)
420  else :
421  print("ERROR: DQHistogramMerge.py execution problem!")
422  retcode = retcode1
423  acronym = 'TRF_DQMHISTMERGE_EXE'
424  dt = 0
425  txt = 'DQHistogramMerge.py execution problem'
426  try:
427  infilelist=open('hist_merge.list','r')
428  for infname in infilelist:
429  genmd5sum(infname.rstrip(os.linesep))
430  finally:
431  infilelist.close()
432  genmd5sum(histfile)
433  DQResFile="DQResourceUtilization.txt"
434  if os.path.exists(DQResFile):
435  print("dumping resource utilization log")
436  with open(DQResFile) as resfile:
437  for resline in resfile:
438  print(resline, end=' ')
439 
440  # assemble job report map
441  reportmap = { 'prodsys': { 'trfCode': retcode,
442  'trfAcronym': acronym,
443  'jobOutputs': outfiles,
444  'jobInputs': infiles,
445  'nevents': int(nevts),
446  'more': { 'num1': int(nevts), 'num2': int(dt), 'txt1': txt }
447  }
448  }
449 
450  # pickle report map
451  f = open('jobReport.gpickle', 'w')
452  pickle.dump(reportmap, f)
453  f.close()
454 
455  print("\n## ... job finished with retcode : %s" % reportmap['prodsys']['trfCode'])
456  print("## ... error acronym: ", reportmap['prodsys']['trfAcronym'])
457  print("## ... elapsed time: ", reportmap['prodsys']['more']['num2'], "sec")
458  print("##")
459  print("##################################################################")
460  print("## End of job.")
461  print("##################################################################\n")
462 
463 
464 
467 
468 if __name__ == "__main__":
469 
470  if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argdict=')) :
471  print("Input format wrong --- use ")
472  print(" --argdict=<pickled-dictionary containing input info> ")
473  print(" with key/value pairs: ")
474  print(" 1) 'inputHistFiles': python list ")
475  print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
476  print(" or list of file dictionaries ")
477  print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
478  print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
479  print(" 2) 'outputHistFile': string 'datasetname#filename' ")
480  print(" (HIST output dataset name + file) ")
481  print(" optional parameters: ")
482  print(" 3) 'incrementalMode': string ('True'/'False') ")
483  print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
484  print(" 'False': create final DQM webpages, replace temporary ones) ")
485  print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
486  print(" ('False': run histogram merging and DQ assessment only; ")
487  print(" 'True': run additional post-processing step (fitting, etc.)) ")
488  print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
489  print(" 6) 'runNumber': int ")
490  print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
491  print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
492  print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
493  print(" ('True': allow upload of defects to database; ")
494  print(" 'False': do not upload defects to database)")
495  sys.exit(-1)
496 
497  else :
498  picklefile = sys.argv[1][len('--argdict='):]
499  dq_combined_trf(picklefile)
DQM_Tier0Wrapper_trf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod)
Definition: DQM_Tier0Wrapper_trf.py:78
DQM_Tier0Wrapper_trf.getFileMap
def getFileMap(fname, dsname, nevts=0)
Definition: DQM_Tier0Wrapper_trf.py:66
DQM_Tier0Wrapper_trf.dq_combined_trf
def dq_combined_trf(picklefile)
Definition: DQM_Tier0Wrapper_trf.py:121
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
Trk::open
@ open
Definition: BinningType.h:40
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
DQM_Tier0Wrapper_trf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_trf.py:106