ATLAS Offline Software
DQPostProcessMod.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 import shutil, re, sys, os
4 from typing import List, Tuple, Callable
5 
6 from .dqu_subprocess import apply as _local_apply
7 
8 def _dolsrwrapper(fname):
9  import ROOT
10  rf = ROOT.TFile.Open(fname, 'READ')
11  if not rf or not rf.IsOpen():
12  print(' %s is empty or not accessible' % fname)
13  return False
14 
15  cleancache = ROOT.gROOT.MustClean(); ROOT.gROOT.SetMustClean(False)
16  _dolsr(rf)
17  rf.Close()
18  ROOT.gROOT.SetMustClean(cleancache)
19 
20 def _dolsr(dir):
21  import ROOT
22 
23  resultdir = dir.Get('Results'); statusobj = None
24 
25  if resultdir:
26  statusobj = resultdir.Get('Status')
27 
28  if statusobj:
29  l = statusobj.GetListOfKeys()
30  l[0].GetName()
31 
32  keys = dir.GetListOfKeys()
33  for key in keys:
34  name = key.GetName()
35  if ROOT.TClass.GetClass(key.GetClassName()).InheritsFrom('TDirectory'):
36  dirobj = key.ReadObj()
37  resultdir = dirobj.Get('Results'); statusobj = None
38  if resultdir:
39  statusobj = resultdir.Get('Status')
40 
41  # is a check? is a summary? or recurse?
42  if name[-1] == '_' and resultdir:
43  dir.Get(name[:-1])
44  subkeys = resultdir.GetListOfKeys()
45  for subkey in subkeys:
46  ressub = subkey.GetName()
47  if ressub not in ('Status', 'Reference', 'ResultObject') and ROOT.TClass.GetClass(subkey.GetClassName()).InheritsFrom('TDirectory'):
48  l = subkey.ReadObj().GetListOfKeys()
49  l[0].GetName()
50 
51  # status flag
52  l = statusobj.GetListOfKeys()
53  l[0].GetName()
54  else:
55  _dolsr(dirobj)
56 
57 def _ProtectPostProcessing( funcinfo, outFileName, isIncremental ):
58  import os
59  isProduction = bool(os.environ.get('DQPRODUCTION', False))
60  func, mail_to = funcinfo
61  tmpfilename = outFileName + "-safe-" + str(os.getpid())
62  shutil.copy2(outFileName, tmpfilename)
63  success = False
64  #try to get log file name and curr directory in order to find job info
65  logFiles=""
66  currDir=" pwd failed!"
67  try:
68  currDir=os.environ["PWD"]
69  logFilesList=[]
70  for files in os.listdir(currDir):
71  if re.match(r'^[0-9]*_.*.log$',files):
72  logFilesList.append("%s"%files)
73  logFiles='\r\n'.join(logFilesList)
74  except Exception:
75  pass
76  try:
77  _local_apply(func, (tmpfilename, isIncremental))
78  _local_apply(_dolsrwrapper, (tmpfilename,))
79  success = True
80  except Exception as e:
81  print('WARNING!!!: POSTPROCESSING FAILED FOR', func.__name__)
82  print(e)
83  print('Relevant results will not be in output')
84  if isProduction:
85  import smtplib
86  server = smtplib.SMTP('localhost')
87  mail_cc = ['ponyisi@utexas.edu', 'rnarayan@utexas.edu']
88  msg = ['From: atlasdqm@cern.ch',
89  'To: %s' % ', '.join(mail_to),
90  'Cc: %s' % ', '.join(mail_cc),
91  'Reply-to: atlasdqm-no-reply@cern.ch',
92  'Subject: WARNING: Postprocessing failure in production job',
93  '',
94  'There has been a failure in Tier-0 postprocessing monitoring.',
95  '',
96  'Function: %s' % func.__name__,
97  'File: %s' % outFileName,
98  'isIncremental? %s' % isIncremental,
99  'Error:',
100  str(e),
101  'Current directory (contains LSF job ID): %s' % currDir,
102  'Log file name (contains ATLAS T0 job ID): %s' % logFiles
103  ]
104  print('Emailing', ', '.join(mail_to))
105  server.sendmail('atlasdqm@cern.ch', mail_to + mail_cc, '\r\n'.join(msg))
106  server.quit()
107  print()
108  finally:
109  if success:
110  shutil.move(tmpfilename, outFileName)
111  else:
112  os.unlink(tmpfilename)
113  return success
114 
115 def rundir(fname):
116  import ROOT
117  f = ROOT.TFile.Open(fname)
118  lk = f.GetListOfKeys()
119  if len(lk) != 1:
120  return None
121  kn = lk[0].GetName()
122  if not kn.startswith('run_'):
123  return None
124  return kn
125 
126 def DQPostProcess( outFileName, isIncremental=False ):
127 
128  from ROOT import gSystem
129  gSystem.Load('libDataQualityUtils')
130  from ROOT import dqutils
131 
133 
134  def rpc_create(dummy, dummy2):
135  from DataQualityUtils.createRpcFolders import (createRPCDQMFDB,
136  createRPCConditionDB)
139 
140  def mdt_create(dummy, isIncremental):
141  # if False and not isIncremental:
142  if not isIncremental:
143  from DataQualityUtils.createMdtFolders import (createMDTConditionDBDead,
144  createMDTConditionDBNoisy)
147 
148  def zlumi(fname, isIncremental):
149  if not isIncremental:
150  from DataQualityUtils.doZLumi import go
151  go(fname)
152 
153  def newstyle(yamlfile: str) -> Callable[[str, bool], None]:
154  import os.path
155  def newstyle_core(fname, isIncremental):
156  if not isIncremental:
157  import subprocess
158  print(f'New style postprocessing running for: {yamlfile}')
159  cmdline = ['histgrinder', '--prefix', f'/{rundir(fname)}/', fname, fname, '-c', yamlfile]
160  subprocess.run(cmdline, check=True)
161  newstyle_core.__name__ = os.path.splitext(os.path.basename(yamlfile))[0]
162  return newstyle_core
163 
164  def make_newstyle_funcs() -> List[Tuple[Callable[[str, bool], None], List[str]]]:
165  rv = []
166  # do we have DataQualityUtils in the DATAPATH?
167  from ._resolve_data_path import resolve_data_path
168  dpath = resolve_data_path("DataQualityUtils")
169  if dpath is None:
170  print("Unable to resolve DataQualityUtils data path, not running new-style postprocessing")
171  return rv
172  import glob, os.path
173  inputs = glob.glob(os.path.join(dpath,'postprocessing/*.yaml'))
174  for input in inputs:
175  rv.append((newstyle(input), ['ponyisi@utexas.edu']))
176  return rv
177 
178  funclist = [
179  (mf.fitMergedFile_IDPerfMonManager,
180  ['weina.ji@cern.ch', 'ana.ovcharova@cern.ch']),
181  (mf.fitMergedFile_DiMuMonManager,
182  ['ana.ovcharova@cern.ch']),
183  (mf.fitMergedFile_IDAlignMonManager,
184  ['John.Alison@cern.ch', 'anthony.morley@cern.ch']),
185  (mf.pv_PrimaryVertexMonitoring_calcResoAndEfficiency,
186  ['Andreas.Wildauer@cern.ch']),
187  (mf.VxMon_move,
188  ['federico.meloni@cern.ch']),
189  (rpc_create,
190  ['michele.bianco@le.infn.it', 'monica.verducci@cern.ch']),
191  (mf.RPCPostProcess,
192  ['michele.bianco@le.infn.it', 'monica.verducci@cern.ch']),
193  (mdt_create,
194  ['john.stakely.keller@cern.ch', 'monica.verducci@cern.ch']),
195  (mf.TGCPostProcess,
196  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp']),
197  (mf.MDTvsTGCPostProcess,
198  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp']),
199  (mf.HLTMuonPostProcess,
200  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp', 'yamazaki@phys.sci.kobe-u.ac.jp']),
201  (mf.HLTEgammaPostProcess,
202  ['yan.jie.schnellbach@cern.ch', 'alessandro.tricoli@cern.ch']),
203  (mf.HLTTauPostProcess,
204  ['Cristobal.Cuenca@cern.ch', 'Geng-Yuan.Jeng@cern.ch', 'Giovanna.Cottin@cern.ch']),
205  (mf.HLTMETPostProcess,
206  ['venkat.kaushik@cern.ch']),
207  (mf.HLTCaloPostProcess,
208  ['gareth.brown@cern.ch']),
209  (mf.HLTJetPostProcess,
210  ['venkat.kaushik@cern.ch']),
211  (mf.BJetTaggingPostProcess,
212  ['m.neumann@cern.ch']),
213  (mf.L1CaloPostProcess,
214  ['ivana.hristova@cern.ch', 'pjwf@hep.ph.bham.ac.uk']),
215  (mf.PixelPostProcess,
216  ['daiki.yamaguchi@cern.ch']),
217  (mf.MuonTrackPostProcess,
218  ['baojia.tong@cern.ch', 'alexander.tuna@cern.ch']),
219  (zlumi,
220  ['ponyisi@utexas.edu']),
221  ]
222 
223  funclist += make_newstyle_funcs()
224 
225  # determine how we react to a failure
226  isTesting = bool(os.environ.get('DQ_POSTPROCESS_ERROR_ON_FAILURE', False))
227 
228  # first try all at once
229  def go_all(fname, isIncremental):
230  for funcinfo in funclist:
231  funcinfo[0](fname, isIncremental)
232 
233  success = _ProtectPostProcessing( (go_all, []), outFileName, isIncremental )
234 
235  if not success:
236  if isTesting:
237  sys.exit(1)
238  else:
239  for funcinfo in funclist:
240  _ProtectPostProcessing( funcinfo, outFileName, isIncremental )
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:115
dqutils::MonitoringFile
Definition: MonitoringFile.h:58
python.DQPostProcessMod.DQPostProcess
def DQPostProcess(outFileName, isIncremental=False)
Definition: DQPostProcessMod.py:126
python.DQPostProcessMod._ProtectPostProcessing
def _ProtectPostProcessing(funcinfo, outFileName, isIncremental)
Definition: DQPostProcessMod.py:57
python.createRpcFolders.createRPCConditionDB
def createRPCConditionDB()
Definition: createRpcFolders.py:51
python.createMdtFolders.createMDTConditionDBDead
def createMDTConditionDBDead()
Definition: createMdtFolders.py:9
python.DQPostProcessMod._dolsrwrapper
def _dolsrwrapper(fname)
Definition: DQPostProcessMod.py:8
python._resolve_data_path.resolve_data_path
def resolve_data_path(fin)
Definition: DataQualityConfigurations/python/_resolve_data_path.py:6
dqt_zlumi_display_z_rate.zlumi
zlumi
Definition: dqt_zlumi_display_z_rate.py:72
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
python.doZLumi.go
def go(fname)
Definition: doZLumi.py:77
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.createRpcFolders.createRPCDQMFDB
def createRPCDQMFDB()
Definition: createRpcFolders.py:10
DQPostProcessTest.mdt_create
def mdt_create(dummy)
Definition: DQPostProcessTest.py:26
python.DQPostProcessMod._dolsr
def _dolsr(dir)
Definition: DQPostProcessMod.py:20
str
Definition: BTagTrackIpAccessor.cxx:11
DQPostProcessTest.rpc_create
def rpc_create(dummy)
Definition: DQPostProcessTest.py:20
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60
python.createMdtFolders.createMDTConditionDBNoisy
def createMDTConditionDBNoisy()
Definition: createMdtFolders.py:54