ATLAS Offline Software
DQM_Tier0Wrapper_trf.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 
5 
54 
55 from __future__ import print_function
56 import sys, string, os.path, os, pickle, time, pprint
57 from six.moves import xmlrpc_client as xmlrpclib
58 #sami
59 import hashlib
60 import six
61 if six.PY2:
62  from commands import getstatusoutput
63 else:
64  from subprocess import getstatusoutput
65 
66 
67 
68 # Utility function
69 
70 def getFileMap(fname, dsname, nevts=0) :
71  if os.path.isfile(fname) :
72  sz = os.path.getsize(fname)
73  map = { 'lfn': fname,
74  'dataset' : dsname,
75  'size' : sz,
76  'events' : nevts
77  }
78  else :
79  map = {}
80  return map
81 
82 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod):
83  import stomp, json, ssl
84  from DataQualityUtils import stompconfig
85  dest='/topic/atlas.dqm.progress'
86  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
87  conn.connect(wait=True, **stompconfig.config())
88 
89  body = {
90  'run': run,
91  'project_tag': ptag,
92  'stream': stream,
93  'ami': ami,
94  'pass': procpass,
95  'hcfg': hcfg,
96  }
97  headers = {
98  'MsgClass':'DQ',
99  'MsgType': (('' if isprod else 'Development') +
100  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
101  'type':'textMessage',
102  'persistent': 'true',
103  'destination': dest,
104  }
105  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
106  conn.disconnect()
107 
108 
109 
110 def genmd5sum(filename):
111  md5summer=hashlib.md5()
112  if os.path.isfile(filename):
113  try:
114  infil=open(filename,'rb')
115  while True:
116  fs=infil.read(8192)
117  if not fs:
118  break
119  md5summer.update(fs)
120  finally:
121  infil.close()
122  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
123  return
124 
125 def dq_combined_trf(picklefile):
126 
127  tstart = time.time()
128 
129  print("\n##################################################################")
130  print("## ATLAS Tier-0 Offline DQM Processing ##")
131  print("##################################################################\n")
132 
133  print("\n##################################################################")
134  print("## STEP 1: creating file with list of root files ...")
135  print("##################################################################\n")
136 
137  # extract parameters from pickle file
138  print("Using pickled file ", picklefile, " for input parameters")
139  f = open(picklefile, 'r')
140  parmap = pickle.load(f)
141  f.close()
142 
143  print("\nFull Tier-0 run options:\n")
144  pprint.pprint(parmap)
145 
146  inputfilelist = parmap.get('inputHistFiles', [])
147  nfiles = len(inputfilelist)
148  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
149  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
150 
151  if not nfiles : # problem with job definition or reading pickle file
152  dt = int(time.time() - tstart)
153  retcode = 1
154  acronym = 'TRF_NOINPUT'
155  txt = 'empty input file list'
156  reportmap = { 'prodsys': { 'trfCode': retcode,
157  'trfAcronym': acronym,
158  'jobOutputs': [],
159  'jobInputs': [],
160  'nevents': 0,
161  'more': { 'num1': 0, 'num2': dt, 'txt1': txt }
162  }
163  }
164 
165  else :
166  histtmpflist = []
167  nevts = 0
168 
169  if isinstance(inputfilelist[0], str) :
170  histtmpdsname = (inputfilelist[0]).split('#')[0]
171  for val in inputfilelist :
172  histtmpflist.append(val.split('#')[1])
173 
174  elif isinstance(inputfilelist[0], dict) :
175  histtmpdsname = inputfilelist[0]['dsn']
176  for fdict in inputfilelist :
177  histtmpflist.append(fdict['lfn'])
178  nevt = fdict.get('events', 0)
179  if nevt is None:
180  nevt=0
181  print("WARNING Can't get number of events from input pickle file")
182  nevts+=nevt
183 
184  f = open('hist_merge.list', 'w')
185  txtstr = ""
186  for hf in histtmpflist :
187  txtstr += "%s\n" % hf
188  f.write(txtstr)
189  f.close()
190 
191  cmd = "cat hist_merge.list"
192  (s,o) = getstatusoutput(cmd)
193  print("\nContents of file hist_merge.list:\n")
194  print(o)
195 
196 
197  print("\n##################################################################")
198  print("## STEP 2: determining job parameters...")
199  print("##################################################################\n")
200 
201  # output file
202  histdsname = (parmap['outputHistFile']).split('#')[0]
203  histfile = (parmap['outputHistFile']).split('#')[1]
204  amitag = histfile.split('.')[5]
205 
206 
207  # incremental mode on/off
208  incr = parmap.get('incrementalMode', 'False')
209 
210  # post-processing on/off
211  postproc = parmap.get('postProcessing', 'True')
212 
213  # database uploading on/off
214  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
215 
216  # do web display
217  doWebDisplay = parmap.get('doWebDisplay', 'True')
218 
219  # production mode
220  productionMode = parmap.get('productionMode', 'True')
221  if productionMode != 'True' and incr == 'True':
222  print("Production mode is not True, turning off incremental mode")
223  incr = 'False'
224 
225  # get file paths, put into environment vars
226  filepaths = parmap.get('filepaths', None)
227  if filepaths and isinstance(filepaths, dict):
228  if 'basename' not in filepaths:
229  print("Improperly formed 'filepaths' (no 'basename')")
230  else:
231  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
232  if evtclass not in filepaths:
233  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
234  else:
235  clinfo = filepaths[evtclass]
236  for timeclass in ('run', 'minutes10', 'minutes30'):
237  if timeclass not in clinfo:
238  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
239  else:
240  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
241  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
242  print("Setting %s = %s" % (dqcenvvar, fpath))
243  os.environ[dqcenvvar] = fpath
244 
245  # extract info from dataset name
246  # AMI project name
247  # override if tag has been specified in parmap
248  try :
249  dqproject = histdsname.split('.')[0]
250  except :
251  dqproject = 'data_test'
252  dqproject = parmap.get('projectTag', dqproject)
253 
254  # run number
255  if 'runNumber' in parmap :
256  runnr = parmap['runNumber']
257  else :
258  try :
259  runnr = int(histdsname.split('.')[1])
260  except :
261  runnr = 1234567890
262 
263  # stream name
264  if 'streamName' in parmap :
265  stream = parmap['streamName']
266  else :
267  try :
268  stream = histdsname.split('.')[2]
269  except :
270  stream = 'test_dummy'
271 
272  # processing pass number
273  MAX_XMLRPC_TRIES = 5
274  if 'procNumber' in parmap :
275  procnumber = parmap['procNumber']
276  else :
277  n_xmlrpc_tries = 1
278  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
279  procnumber = 99
280  try :
281  xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888')
282  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
283  break
284  except :
285  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
286  n_xmlrpc_tries += 1
287  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
288  time.sleep(20*2**n_xmlrpc_tries)
289 
290  print("Job parameters:\n")
291  print(" Run number: ", runnr)
292  print(" Stream name: ", stream)
293  print(" Processing pass: ", procnumber)
294  print(" Incremental mode:", incr)
295  print(" Post-processing: ", postproc)
296  print(" COOL uploads: ", allowCOOLUpload)
297  print(" Production mode: ", productionMode)
298 
299 
300  print("\n##################################################################")
301  print("## STEP 3: running histogram merging procedure ...")
302  print("##################################################################\n")
303 
304  # environment setting
305  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
306  os.environ['DQ_STREAM'] = stream
307  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
308  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
309  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
310 
311  if postproc == 'True' :
312  if incr == 'True':
313  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
314  else:
315  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
316  else :
317  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
318 
319  print("Histogram merging command:\n")
320  print(cmd)
321  print("\n##################################################################\n")
322 
323  print("## ... logfile from DQHistogramMerge.py: ")
324  print("--------------------------------------------------------------------------------")
325  # execute command
326  retcode1 = os.system(cmd)
327  print("--------------------------------------------------------------------------------")
328  t1 = time.time()
329  dt1 = int(t1 - tstart)
330 
331  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
332  print("## ... elapsed time: ", dt1, " sec")
333 
334  if retcode1 == 0 :
335  if postproc == 'True' and incr == 'False':
336  print("\n##################################################################")
337  print("## STEP 3b: copying postprocessing output to AFS ...")
338  print("##################################################################\n")
339 
340  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
341 
342  print("File move command:\n")
343  print(cmd)
344  print("\n##################################################################\n")
345 
346  print("## ... logfile from DQFileMove.py: ")
347  print("--------------------------------------------------------------------------------")
348  # execute command
349  retcode1b = os.system(cmd)
350  print("--------------------------------------------------------------------------------")
351  t1b = time.time()
352  dt1b = int(t1b - t1)
353  t1 = t1b
354 
355  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
356  print("## ... elapsed time: ", dt1b, " sec")
357 
358  if doWebDisplay == 'True':
359  print("\n##################################################################")
360  print("## STEP 4: running web-display creation procedure ...")
361  print("##################################################################\n")
362 
363  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
364 
365  print("Web display creation command:\n")
366  print(cmd)
367  print("\n##################################################################\n")
368 
369  print("## ... logfile from DQWebDisplay.py: ")
370  print("--------------------------------------------------------------------------------")
371  # execute command
372  retcode2 = os.system(cmd)
373  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
374  print("--------------------------------------------------------------------------------")
375  t2 = time.time()
376  dt2 = int(t2 - t1)
377 
378  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
379  print("## ... elapsed time: ", dt2, " sec")
380  else:
381  print("\n##################################################################")
382  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
383  print("##################################################################\n")
384  retcode2 = 0
385  dt2 = 0
386 
387  print("\n##################################################################")
388  print("## STEP 5: finishing the job ...")
389  print("##################################################################\n")
390 
391  # assemble report gpickle file
392  outfiles = []
393  infiles = []
394 
395  retcode = 0
396  acronym = 'OK'
397  txt = 'trf finished OK'
398 
399  # get info for report gpickle file
400  if retcode1 == 0 :
401  dt = dt1
402  if (retcode2 >> 8) in (0, 5) :
403  # if success, or if unable to acquire cache lock
404  histmap = getFileMap(histfile, histdsname, nevts=nevts)
405  outfiles = [histmap]
406  dt += dt2
407  if doWebDisplay == 'True':
408  print('Publishing to message service')
409  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'))
410  else:
411  print('Web display off, not publishing to message service')
412  else :
413  txt = 'DQWebDisplay.py execution problem'
414  print("ERROR: DQWebDisplay.py execution problem!")
415  retcode = retcode2
416  acronym = 'TRF_DQMDISPLAY_EXE'
417  try:
418  infilelist=open('hist_merge.list','r')
419  for infname in infilelist:
420  genmd5sum(infname.rstrip(os.linesep))
421  finally:
422  infilelist.close()
423  genmd5sum(histfile)
424  else :
425  print("ERROR: DQHistogramMerge.py execution problem!")
426  retcode = retcode1
427  acronym = 'TRF_DQMHISTMERGE_EXE'
428  dt = 0
429  txt = 'DQHistogramMerge.py execution problem'
430  try:
431  infilelist=open('hist_merge.list','r')
432  for infname in infilelist:
433  genmd5sum(infname.rstrip(os.linesep))
434  finally:
435  infilelist.close()
436  genmd5sum(histfile)
437  DQResFile="DQResourceUtilization.txt"
438  if os.path.exists(DQResFile):
439  print("dumping resource utilization log")
440  with open(DQResFile) as resfile:
441  for resline in resfile:
442  print(resline, end=' ')
443 
444  # assemble job report map
445  reportmap = { 'prodsys': { 'trfCode': retcode,
446  'trfAcronym': acronym,
447  'jobOutputs': outfiles,
448  'jobInputs': infiles,
449  'nevents': int(nevts),
450  'more': { 'num1': int(nevts), 'num2': int(dt), 'txt1': txt }
451  }
452  }
453 
454  # pickle report map
455  f = open('jobReport.gpickle', 'w')
456  pickle.dump(reportmap, f)
457  f.close()
458 
459  print("\n## ... job finished with retcode : %s" % reportmap['prodsys']['trfCode'])
460  print("## ... error acronym: ", reportmap['prodsys']['trfAcronym'])
461  print("## ... elapsed time: ", reportmap['prodsys']['more']['num2'], "sec")
462  print("##")
463  print("##################################################################")
464  print("## End of job.")
465  print("##################################################################\n")
466 
467 
468 
471 
472 if __name__ == "__main__":
473 
474  if (len(sys.argv) != 2) and (not sys.argv[1].startswith('--argdict=')) :
475  print("Input format wrong --- use ")
476  print(" --argdict=<pickled-dictionary containing input info> ")
477  print(" with key/value pairs: ")
478  print(" 1) 'inputHistFiles': python list ")
479  print(" ['datasetname#filename1', 'datasetname#filename2',...] (input dataset + file names) ")
480  print(" or list of file dictionaries ")
481  print(" [{'lfn':'fname1', 'checksum':'cks1', 'dsn':'dsn1', 'size':sz1, 'guid':'guid1', 'events':nevts1, ...}, ")
482  print(" {'lfn':'fname2', 'checksum':'cks2', 'dsn':'dsn2', 'size':sz2, 'guid':'guid2', 'events':nevts2, ...}, ...] ")
483  print(" 2) 'outputHistFile': string 'datasetname#filename' ")
484  print(" (HIST output dataset name + file) ")
485  print(" optional parameters: ")
486  print(" 3) 'incrementalMode': string ('True'/'False') ")
487  print(" ('True': do incremental update of DQM webpages on top of existing statistics; ")
488  print(" 'False': create final DQM webpages, replace temporary ones) ")
489  print(" 4) 'postProcessing': string ('True'/'False', default: 'True') ")
490  print(" ('False': run histogram merging and DQ assessment only; ")
491  print(" 'True': run additional post-processing step (fitting, etc.)) ")
492  print(" 5) 'procNumber': int (number of processing pass, e.g. 1,2, ...) ")
493  print(" 6) 'runNumber': int ")
494  print(" 7) 'streamName': string (e.g., physics_IDCosmic, physics_Express, ...) ")
495  print(" 8) 'projectTag': string (e.g., data10_7TeV, TrigDisplay)")
496  print(" 9) 'allowCOOLUpload': string ('True'/'False', default: 'True')")
497  print(" ('True': allow upload of defects to database; ")
498  print(" 'False': do not upload defects to database)")
499  sys.exit(-1)
500 
501  else :
502  picklefile = sys.argv[1][len('--argdict='):]
503  dq_combined_trf(picklefile)
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
DQM_Tier0Wrapper_trf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod)
Definition: DQM_Tier0Wrapper_trf.py:82
DQM_Tier0Wrapper_trf.getFileMap
def getFileMap(fname, dsname, nevts=0)
Definition: DQM_Tier0Wrapper_trf.py:70
DQM_Tier0Wrapper_trf.dq_combined_trf
def dq_combined_trf(picklefile)
Definition: DQM_Tier0Wrapper_trf.py:125
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
Trk::open
@ open
Definition: BinningType.h:40
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
DQM_Tier0Wrapper_trf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_trf.py:110