ATLAS Offline Software
Functions | Variables
DQM_Tier0Wrapper_tf Namespace Reference

Functions

def getSubFileMap (fname, nevts=0)
 
def publish_success_to_mq (run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap)
 
def genmd5sum (filename)
 
def dq_combined_trf (jsonfile, outmap)
 
def dq_trf_wrapper (jsonfile)
 

Variables

 jsonfile = sys.argv[1][len('--argJSON='):]
 main() More...
 

Function Documentation

◆ dq_combined_trf()

def DQM_Tier0Wrapper_tf.dq_combined_trf (   jsonfile,
  outmap 
)

Definition at line 140 of file DQM_Tier0Wrapper_tf.py.

140 def dq_combined_trf(jsonfile, outmap):
141 
142  print("\n##################################################################")
143  print("## STEP 1: creating file with list of root files ...")
144  print("##################################################################\n")
145 
146  nfiles=0
147 
148  try:
149  # extract parameters from json file
150  print("Using json file ", jsonfile, " for input parameters")
151  f = open(jsonfile, 'r')
152  parmap = json.load(f)
153  f.close()
154 
155  print("\nFull Tier-0 run options:\n")
156  pprint.pprint(parmap)
157 
158  inputfilelist = parmap.get('inputHistFiles', [])
159  nfiles = len(inputfilelist)
160  histMergeCompressionLevel=parmap.get('histMergeCompressionLevel', 1)
161  histMergeDebugLevel=parmap.get('histMergeDebugLevel', 0)
162  except:
163  outmap['exitCode'] = 101
164  outmap['exitAcronym'] = 'TRF_NOINPUT'
165  outmap['exitMsg'] = 'Trouble reading json input dict.'
166  traceback.print_exc()
167  return
168 
169  if not nfiles : # problem with job definition or reading json file
170  outmap['exitCode'] = 102
171  outmap['exitAcronym'] = 'TRF_NOINPUT'
172  outmap['exitMsg'] = 'Empty input file list.'
173  return
174 
175  histtmpflist = []
176  histtmpdsname = ''
177  nevts = 0
178 
179  try:
180  if isinstance(inputfilelist[0], six.text_type) :
181  histtmpdsname = (inputfilelist[0]).split('#')[0]
182  for val in inputfilelist :
183  histtmpflist.append(val.split('#')[1])
184 
185  elif isinstance(inputfilelist[0], dict) :
186  histtmpdsname = inputfilelist[0]['dsn']
187  for fdict in inputfilelist :
188  histtmpflist.append(fdict['lfn'])
189  nevt = fdict.get('events', 0)
190  if nevt is None:
191  nevt=0
192  print("WARNING Can't get number of events from input json file")
193  nevts+=nevt
194 
195  f = open('hist_merge.list', 'w')
196  txtstr = ""
197  for hf in histtmpflist :
198  txtstr += "%s\n" % hf
199  f.write(txtstr)
200  f.close()
201 
202  cmd = "cat hist_merge.list"
203  (s,o) = getstatusoutput(cmd)
204  print("\nContents of file hist_merge.list:\n")
205  print(o)
206  except:
207  outmap['exitCode'] = 103
208  outmap['exitAcronym'] = 'TRF_INPUTINFO'
209  outmap['exitMsg'] = 'ERROR: crash in assembling input file list (STEP 1)'
210  traceback.print_exc()
211  return
212 
213  try:
214  print("\n##################################################################")
215  print("## STEP 2: determining job parameters...")
216  print("##################################################################\n")
217 
218  skipMerge = parmap.get('skipMerge', 'False')
219  if skipMerge == 'True':
220  if nfiles != 1:
221  print("ERROR: skipMerge specified but something other than one input file specified")
222  outmap['exitCode'] = 108
223  outmap['exitAcronym'] = 'TRF_INPUTINFO'
224  outmap['exitMsg'] = 'ERROR: skipMerge specified but something other than one input file specified (STEP 1)'
225  return
226  histdsname = histtmpdsname
227  histfile = histtmpflist[0]
228  else:
229  # normal output file determination
230  histdsname = (parmap['outputHistFile']).split('#')[0]
231  histfile = (parmap['outputHistFile']).split('#')[1]
232  amitag = histdsname.split('.')[5]
233 
234 
235  # incremental mode on/off
236  incr = parmap.get('incrementalMode', 'False')
237 
238  # post-processing on/off
239  postproc = parmap.get('postProcessing', 'True')
240 
241  # database uploading on/off
242  allowCOOLUpload = parmap.get('allowCOOLUpload', 'True')
243 
244  # do web display
245  doWebDisplay = parmap.get('doWebDisplay', 'True')
246 
247  # extra parameters for merge
248  mergeParams = parmap.get('mergeParams', '')
249 
250  # production mode
251  productionMode = parmap.get('productionMode', 'True')
252  if productionMode != 'True' and incr == 'True':
253  print("Production mode is not True, turning off incremental mode")
254  incr = 'False'
255 
256  # server list
257  servers = parmap.get('servers', 'False')
258  os.environ['DQC_SERVERS'] = servers
259 
260  # get file paths, put into environment vars
261  filepaths = parmap.get('filepaths', None)
262  if filepaths and isinstance(filepaths, dict):
263  if 'basename' not in filepaths:
264  print("Improperly formed 'filepaths' (no 'basename')")
265  else:
266  for evtclass in ('Collisions', 'Cosmics', 'HeavyIons'):
267  if evtclass not in filepaths:
268  print("Improperly formed 'filepaths' (no '%s')" % evtclass)
269  else:
270  clinfo = filepaths[evtclass]
271  for timeclass in ('run', 'minutes10', 'minutes30'):
272  if timeclass not in clinfo:
273  print("Improperly formed 'filepaths[%s]' (no '%s')" % (evtclass, timeclass))
274  else:
275  dqcenvvar = 'DQC_HCFG_%s_%s' % (evtclass.upper(), timeclass.upper())
276  fpath = os.path.join(filepaths['basename'], clinfo[timeclass])
277  print("Setting %s = %s" % (dqcenvvar, fpath))
278  os.environ[dqcenvvar] = fpath
279 
280  # extract info from dataset name
281  # AMI project name
282  # override if tag has been specified in parmap
283  try :
284  dqproject = histdsname.split('.')[0]
285  except :
286  dqproject = 'data_test'
287  dqproject = parmap.get('projectTag', dqproject)
288 
289  # run number
290  if 'runNumber' in parmap :
291  runnr = parmap['runNumber']
292  else :
293  try :
294  runnr = int(histdsname.split('.')[1])
295  except :
296  runnr = 1234567890
297 
298  # stream name
299  if 'streamName' in parmap :
300  stream = parmap['streamName']
301  else :
302  try :
303  stream = histdsname.split('.')[2]
304  except :
305  stream = 'test_dummy'
306 
307  # processing pass number
308  procnumber = 99
309  MAX_XMLRPC_TRIES = 5
310  if 'procNumber' in parmap :
311  procnumber = parmap['procNumber']
312  else :
313  n_xmlrpc_tries = 1
314  while n_xmlrpc_tries <= MAX_XMLRPC_TRIES :
315  procnumber = 99
316  try :
317  xmlrpcserver = xmlrpclib.ServerProxy('http://proc-pass-atlasdqm.app.cern.ch')
318  #xmlrpcserver = xmlrpclib.ServerProxy('http://atlasdqm.cern.ch:8888') ##old server
319  procnumber = xmlrpcserver.get_next_proc_pass(runnr, stream, 'tier0')
320  break
321  except :
322  print('Web service connection failed, attempt', n_xmlrpc_tries, 'of', MAX_XMLRPC_TRIES)
323  n_xmlrpc_tries += 1
324  if n_xmlrpc_tries <= MAX_XMLRPC_TRIES:
325  time.sleep(20*2**n_xmlrpc_tries)
326 
327 
328  print("Job parameters:\n")
329  print(" Run number: ", runnr)
330  print(" Stream name: ", stream)
331  print(" Processing pass: ", procnumber)
332  print(" Incremental mode:", incr)
333  print(" Post-processing: ", postproc)
334  print(" COOL uploads: ", allowCOOLUpload)
335  print(" Production mode: ", productionMode)
336  print(" Skip merge: ", skipMerge)
337  print(" Merge parameters:", mergeParams)
338  if servers == []:
339  print(" EOS only: True")
340 
341  except:
342  outmap['exitCode'] = 104
343  outmap['exitAcronym'] = 'TRF_JOBPARS'
344  outmap['exitMsg'] = 'Error in determining job parameters (STEP 2).'
345  traceback.print_exc()
346  return
347 
348  # environment setting
349  os.environ['DQPRODUCTION'] = '1' if productionMode == 'True' else '0'
350  print("Setting env variable DQPRODUCTION to %s\n" % os.environ['DQPRODUCTION'])
351  os.environ['DQ_STREAM'] = stream
352  print("Setting env variable DQ_STREAM to %s\n" % os.environ['DQ_STREAM'])
353  os.environ['COOLUPLOADS'] = '1' if allowCOOLUpload == 'True' and productionMode == 'True' else '0'
354  print("Setting env variable COOLUPLOADS to %s\n" % os.environ['COOLUPLOADS'])
355 
356  if skipMerge != 'True':
357  try:
358  print("\n##################################################################")
359  print("## STEP 3: running histogram merging procedure ...")
360  print("##################################################################\n")
361 
362 
363  if postproc == 'True' :
364  if incr == 'True':
365  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 1 %d %d " % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
366  else:
367  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 1 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
368  else :
369  cmd = "python -u `which DQHistogramMerge.py` hist_merge.list %s 0 0 %d %d" % (histfile,histMergeCompressionLevel,histMergeDebugLevel)
370 
371  cmd += (f' {mergeParams}' if mergeParams else '')
372 
373  print("Histogram merging command:\n")
374  print(cmd)
375  print("\n##################################################################\n")
376 
377  print("## ... logfile from DQHistogramMerge.py: ")
378  print("--------------------------------------------------------------------------------")
379  tstart = time.time()
380  # execute command
381  retcode1 = os.system(cmd)
382  print("--------------------------------------------------------------------------------")
383  t1 = time.time()
384  dt1 = int(t1 - tstart)
385 
386  print("\n## DQHistogramMerge.py finished with retcode = %s" % retcode1)
387  print("## ... elapsed time: ", dt1, " sec")
388 
389  if retcode1 != 0 :
390  outmap['exitCode'] = retcode1
391  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
392  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem! (STEP 3).'
393  print("ERROR: DQHistogramMerge.py execution problem!")
394  retcode = retcode1
395  txt = 'DQHistogramMerge.py execution problem'
396  try:
397  try:
398  with open('hist_merge.list','r') as infilelist:
399  for infname in infilelist:
400  genmd5sum(infname.rstrip(os.linesep))
401  except:
402  pass
403  genmd5sum(histfile)
404  DQResFile="DQResourceUtilization.txt"
405  if os.path.exists(DQResFile):
406  print("dumping resource utilization log")
407  with open(DQResFile) as resfile:
408  for resline in resfile:
409  print(resline, end=' ')
410  except:
411  outmap['exitMsg'] = 'ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization! (STEP 3).'
412  traceback.print_exc()
413  print("ERROR: DQHistogramMerge.py execution problem + problem dumping DQResourceUtilization!")
414  return
415 
416  if postproc == 'True' and incr == 'False':
417  print("\n##################################################################")
418  print("## STEP 3b: copying postprocessing output to AFS ...")
419  print("##################################################################\n")
420 
421  cmd = "python -u `which DQFileMove.py` %s %s_%s_%s" % (dqproject, runnr, stream, procnumber)
422 
423  print("File move command:\n")
424  print(cmd)
425  print("\n##################################################################\n")
426 
427  print("## ... logfile from DQFileMove.py: ")
428  print("--------------------------------------------------------------------------------")
429  # execute command
430  retcode1b = os.system(cmd)
431  print("--------------------------------------------------------------------------------")
432  t1b = time.time()
433  dt1b = int(t1b - t1)
434  t1 = t1b
435 
436  print("\n## DQFileMove.py finished with retcode = %s" % retcode1b)
437  print("## ... elapsed time: ", dt1b, " sec")
438  except:
439  outmap['exitCode'] = 105
440  outmap['exitAcronym'] = 'TRF_DQMHISTMERGE_EXE'
441  outmap['exitMsg'] = 'ERROR: Failure in histogram merging or copying postprocessing output to AFS (STEP 3/3b).'
442  traceback.print_exc()
443  return
444  else:
445  print("\n##################################################################")
446  print("## HISTOGRAM MERGE/POSTPROCESSING SKIPPED BY USER REQUEST")
447  print("##################################################################\n")
448  t1 = time.time()
449 
450  try:
451  retcode2 = 0
452  dt2 = 0
453  if doWebDisplay == 'True':
454  print("\n##################################################################")
455  print("## STEP 4: running web-display creation procedure ...")
456  print("##################################################################\n")
457 
458  cmd = "python -u `which DQWebDisplay.py` %s %s %s %s stream=%s" % (histfile, dqproject, procnumber, incr, stream)
459 
460  print("Web display creation command:\n")
461  print(cmd)
462  print("\n##################################################################\n")
463 
464  print("## ... logfile from DQWebDisplay.py: ")
465  print("--------------------------------------------------------------------------------")
466  # execute command
467  retcode2 = os.system(cmd)
468  print('DO NOT REPORT "Error in TH1: cannot merge histograms" ERRORS! THESE ARE IRRELEVANT!')
469  print("--------------------------------------------------------------------------------")
470  t2 = time.time()
471  dt2 = int(t2 - t1)
472 
473  print("\n## DQWebDisplay.py finished with retcode = %s" % retcode2)
474  print("## ... elapsed time: ", dt2, " sec")
475  if not (retcode2 >> 8) in (0, 5) :
476  print("ERROR: DQWebDisplay.py execution problem!")
477  outmap['exitCode'] = retcode2
478  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
479  outmap['exitMsg'] = 'ERROR: DQWebDisplay.py execution problem! (STEP 4).'
480  try:
481  with open('hist_merge.list','r') as infilelist:
482  for infname in infilelist:
483  genmd5sum(infname.rstrip(os.linesep))
484  except:
485  pass
486  genmd5sum(histfile)
487  return
488  if productionMode == 'True':
489  try:
490  print('Publishing to message service')
491  publish_success_to_mq(runnr, dqproject, stream, incr=(incr=='True'), ami=amitag, procpass=procnumber, hcfg=filepaths, isprod=(productionMode=='True'), parmap=parmap)
492  except:
493  outmap['exitCode'] = 106
494  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
495  outmap['exitMsg'] = 'ERROR: Failure in publishing info to messaging service (STEP 4).'
496  traceback.print_exc()
497  return
498  else:
499  print("\n##################################################################")
500  print("## WEB DISPLAY CREATION SKIPPED BY USER REQUEST")
501  print("##################################################################\n")
502  print('Web display off, not publishing to message service')
503  except:
504  outmap['exitCode'] = 106
505  outmap['exitAcronym'] = 'TRF_DQMDISPLAY_EXE'
506  outmap['exitMsg'] = 'ERROR: Failure in web-display creation procedure (STEP 4).'
507  print('ERROR: Failure in web-display creation procedure (STEP 4).')
508  traceback.print_exc()
509  return
510 
511  print("\n##################################################################")
512  print("## STEP 5: finishing the job ...")
513  print("##################################################################\n")
514 
515  # get info for report json file
516  try:
517  outfiles = [getSubFileMap(histfile, nevts=nevts)]
518  # assemble job report map
519  outmap['files']['output'][0]['dataset'] = histdsname
520  outmap['files']['output'][0]['subFiles'] = outfiles
521  outmap['resource']['transform']['processedEvents'] = int(nevts)
522  return
523  except:
524  outmap['exitCode'] = 107
525  outmap['exitAcronym'] = 'TRF_JOBREPORT'
526  outmap['exitMsg'] = 'ERROR: in job report creation (STEP 5)'
527  print("ERROR: in job report creation (STEP 5) !")
528  traceback.print_exc()
529  return
530 

◆ dq_trf_wrapper()

def DQM_Tier0Wrapper_tf.dq_trf_wrapper (   jsonfile)

Definition at line 531 of file DQM_Tier0Wrapper_tf.py.

531 def dq_trf_wrapper(jsonfile):
532  print("\n##################################################################")
533  print("## ATLAS Tier-0 Offline DQM Processing ##")
534  print("##################################################################\n")
535 
536  outmap = { 'exitAcronym' : 'OK',
537  'exitCode' : 0,
538  'exitMsg' : 'trf finished OK',
539  'files' : { 'output' : [{ 'dataset' : '',
540  'subFiles' : [ {},
541  ]}
542  ] },
543  'resource' : { 'transform' : { 'processedEvents' : 0 } }
544  }
545 
546  # dq_combined_trf will update outmap
547  tstart = time.time()
548  dq_combined_trf(jsonfile, outmap)
549  outmap['resource']['transform']['wallTime'] = int(time.time() - tstart)
550 
551  # dump json report map
552  f = open('jobReport.json', 'w')
553  json.dump(outmap, f)
554  f.close()
555 
556  # summarize status
557  print("\n## ... job finished with retcode : %s" % outmap['exitCode'])
558  print("## ... error acronym: ", outmap['exitAcronym'])
559  print("## ... job status message: ", outmap['exitMsg'])
560  print("## ... elapsed time: ", outmap['resource']['transform']['wallTime'], "sec")
561  print("##")
562  print("##################################################################")
563  print("## End of job.")
564  print("##################################################################\n")
565 
566 

◆ genmd5sum()

def DQM_Tier0Wrapper_tf.genmd5sum (   filename)

Definition at line 125 of file DQM_Tier0Wrapper_tf.py.

125 def genmd5sum(filename):
126  md5summer=hashlib.md5()
127  if os.path.isfile(filename):
128  try:
129  with open(filename,'rb') as infil:
130  while True:
131  fs=infil.read(8192)
132  if not fs:
133  break
134  md5summer.update(fs)
135  except:
136  pass
137  print("md5 sum of the \"%s\" is %s"%(filename,md5summer.hexdigest()))
138  return
139 

◆ getSubFileMap()

def DQM_Tier0Wrapper_tf.getSubFileMap (   fname,
  nevts = 0 
)

Definition at line 78 of file DQM_Tier0Wrapper_tf.py.

78 def getSubFileMap(fname, nevts=0) :
79  if os.path.isfile(fname) :
80  sz = os.path.getsize(fname)
81  map = { 'name': fname,
82  'file_size' : sz,
83  'nentries' : nevts,
84  }
85  else :
86  map = {}
87  return map
88 

◆ publish_success_to_mq()

def DQM_Tier0Wrapper_tf.publish_success_to_mq (   run,
  ptag,
  stream,
  incr,
  ami,
  procpass,
  hcfg,
  isprod,
  parmap 
)

Definition at line 89 of file DQM_Tier0Wrapper_tf.py.

89 def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap):
90  import stomp, json
91  from DataQualityUtils import stompconfig
92  dest='/topic/atlas.dqm.progress'
93  conn=stomp.Connection([('atlas-mb.cern.ch', 61013)])
94  conn.connect(wait=True, **stompconfig.config())
95 
96  eos_only = False
97 
98  servers = parmap.get('servers', 'False')
99  if servers == [] or servers == '':
100  eos_only = True
101 
102 
103  body = {
104  'run': run,
105  'project_tag': ptag,
106  'stream': stream,
107  'ami': ami,
108  'pass': procpass,
109  'hcfg': hcfg,
110  'eos_only': eos_only,
111  }
112  headers = {
113  'MsgClass':'DQ',
114  'MsgType': (('' if isprod else 'Development') +
115  ('WebDisplayRunComplete' if not incr else 'WebDisplayIncremental')),
116  'type':'textMessage',
117  'persistent': 'true',
118  'destination': dest,
119  }
120  conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto')
121  conn.disconnect()
122 

Variable Documentation

◆ jsonfile

DQM_Tier0Wrapper_tf.jsonfile = sys.argv[1][len('--argJSON='):]

main()

Definition at line 601 of file DQM_Tier0Wrapper_tf.py.

DQM_Tier0Wrapper_tf.dq_combined_trf
def dq_combined_trf(jsonfile, outmap)
Definition: DQM_Tier0Wrapper_tf.py:140
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
DQM_Tier0Wrapper_tf.genmd5sum
def genmd5sum(filename)
Definition: DQM_Tier0Wrapper_tf.py:125
DQM_Tier0Wrapper_tf.dq_trf_wrapper
def dq_trf_wrapper(jsonfile)
Definition: DQM_Tier0Wrapper_tf.py:531
Trk::open
@ open
Definition: BinningType.h:40
DQM_Tier0Wrapper_tf.publish_success_to_mq
def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod, parmap)
Definition: DQM_Tier0Wrapper_tf.py:89
DQM_Tier0Wrapper_tf.getSubFileMap
def getSubFileMap(fname, nevts=0)
Definition: DQM_Tier0Wrapper_tf.py:78
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
Trk::split
@ split
Definition: LayerMaterialProperties.h:38