ATLAS Offline Software
DQPostProcessMod.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 from __future__ import print_function
3 
4 import shutil, re, sys, os
5 from typing import List, Tuple, Callable
6 
7 from .dqu_subprocess import apply as _local_apply
8 
9 def _dolsrwrapper(fname):
10  import ROOT
11  rf = ROOT.TFile.Open(fname, 'READ')
12  if not rf or not rf.IsOpen():
13  print(' %s is empty or not accessible' % fname)
14  return False
15 
16  cleancache = ROOT.gROOT.MustClean(); ROOT.gROOT.SetMustClean(False)
17  _dolsr(rf)
18  rf.Close()
19  ROOT.gROOT.SetMustClean(cleancache)
20 
21 def _dolsr(dir):
22  import ROOT
23 
24  resultdir = dir.Get('Results'); statusobj = None
25 
26  if resultdir:
27  statusobj = resultdir.Get('Status')
28 
29  if statusobj:
30  l = statusobj.GetListOfKeys()
31  l[0].GetName()
32 
33  keys = dir.GetListOfKeys()
34  for key in keys:
35  name = key.GetName()
36  if ROOT.TClass.GetClass(key.GetClassName()).InheritsFrom('TDirectory'):
37  dirobj = key.ReadObj()
38  resultdir = dirobj.Get('Results'); statusobj = None
39  if resultdir:
40  statusobj = resultdir.Get('Status')
41 
42  # is a check? is a summary? or recurse?
43  if name[-1] == '_' and resultdir:
44  dir.Get(name[:-1])
45  subkeys = resultdir.GetListOfKeys()
46  for subkey in subkeys:
47  ressub = subkey.GetName()
48  if ressub not in ('Status', 'Reference', 'ResultObject') and ROOT.TClass.GetClass(subkey.GetClassName()).InheritsFrom('TDirectory'):
49  l = subkey.ReadObj().GetListOfKeys()
50  l[0].GetName()
51 
52  # status flag
53  l = statusobj.GetListOfKeys()
54  l[0].GetName()
55  else:
56  _dolsr(dirobj)
57 
58 def _ProtectPostProcessing( funcinfo, outFileName, isIncremental ):
59  import os
60  isProduction = bool(os.environ.get('DQPRODUCTION', False))
61  func, mail_to = funcinfo
62  tmpfilename = outFileName + "-safe-" + str(os.getpid())
63  shutil.copy2(outFileName, tmpfilename)
64  success = False
65  #try to get log file name and curr directory in order to find job info
66  logFiles=""
67  currDir=" pwd failed!"
68  try:
69  currDir=os.environ["PWD"]
70  logFilesList=[]
71  for files in os.listdir(currDir):
72  if re.match(r'^[0-9]*_.*.log$',files):
73  logFilesList.append("%s"%files)
74  logFiles='\r\n'.join(logFilesList)
75  except Exception:
76  pass
77  try:
78  _local_apply(func, (tmpfilename, isIncremental))
79  _local_apply(_dolsrwrapper, (tmpfilename,))
80  success = True
81  except Exception as e:
82  print('WARNING!!!: POSTPROCESSING FAILED FOR', func.__name__)
83  print(e)
84  print('Relevant results will not be in output')
85  if isProduction:
86  import smtplib
87  server = smtplib.SMTP('localhost')
88  mail_cc = ['ponyisi@utexas.edu', 'rnarayan@utexas.edu']
89  msg = ['From: atlasdqm@cern.ch',
90  'To: %s' % ', '.join(mail_to),
91  'Cc: %s' % ', '.join(mail_cc),
92  'Reply-to: atlasdqm-no-reply@cern.ch',
93  'Subject: WARNING: Postprocessing failure in production job',
94  '',
95  'There has been a failure in Tier-0 postprocessing monitoring.',
96  '',
97  'Function: %s' % func.__name__,
98  'File: %s' % outFileName,
99  'isIncremental? %s' % isIncremental,
100  'Error:',
101  str(e),
102  'Current directory (contains LSF job ID): %s' % currDir,
103  'Log file name (contains ATLAS T0 job ID): %s' % logFiles
104  ]
105  print('Emailing', ', '.join(mail_to))
106  server.sendmail('atlasdqm@cern.ch', mail_to + mail_cc, '\r\n'.join(msg))
107  server.quit()
108  print()
109  finally:
110  if success:
111  shutil.move(tmpfilename, outFileName)
112  else:
113  os.unlink(tmpfilename)
114  return success
115 
116 def rundir(fname):
117  import ROOT
118  f = ROOT.TFile.Open(fname)
119  lk = f.GetListOfKeys()
120  if len(lk) != 1:
121  return None
122  kn = lk[0].GetName()
123  if not kn.startswith('run_'):
124  return None
125  return kn
126 
127 def DQPostProcess( outFileName, isIncremental=False ):
128 
129  from ROOT import gSystem
130  gSystem.Load('libDataQualityUtils')
131  from ROOT import dqutils
132 
134 
135  def rpc_create(dummy, dummy2):
136  from DataQualityUtils.createRpcFolders import (createRPCDQMFDB,
137  createRPCConditionDB)
140 
141  def mdt_create(dummy, isIncremental):
142  # if False and not isIncremental:
143  if not isIncremental:
144  from DataQualityUtils.createMdtFolders import (createMDTConditionDBDead,
145  createMDTConditionDBNoisy)
148 
149  def zlumi(fname, isIncremental):
150  if not isIncremental:
151  from DataQualityUtils.doZLumi import go
152  go(fname)
153 
154  def newstyle(yamlfile: str) -> Callable[[str, bool], None]:
155  import os.path
156  def newstyle_core(fname, isIncremental):
157  if not isIncremental:
158  import subprocess
159  print(f'New style postprocessing running for: {yamlfile}')
160  cmdline = ['histgrinder', '--prefix', f'/{rundir(fname)}/', fname, fname, '-c', yamlfile]
161  subprocess.run(cmdline, check=True)
162  newstyle_core.__name__ = os.path.splitext(os.path.basename(yamlfile))[0]
163  return newstyle_core
164 
165  def make_newstyle_funcs() -> List[Tuple[Callable[[str, bool], None], List[str]]]:
166  rv = []
167  # do we have DataQualityUtils in the DATAPATH?
168  from ._resolve_data_path import resolve_data_path
169  dpath = resolve_data_path("DataQualityUtils")
170  if dpath is None:
171  print("Unable to resolve DataQualityUtils data path, not running new-style postprocessing")
172  return rv
173  import glob, os.path
174  inputs = glob.glob(os.path.join(dpath,'postprocessing/*.yaml'))
175  for input in inputs:
176  rv.append((newstyle(input), ['ponyisi@utexas.edu']))
177  return rv
178 
179  funclist = [
180  (mf.fitMergedFile_IDPerfMonManager,
181  ['weina.ji@cern.ch', 'ana.ovcharova@cern.ch']),
182  (mf.fitMergedFile_DiMuMonManager,
183  ['ana.ovcharova@cern.ch']),
184  (mf.fitMergedFile_IDAlignMonManager,
185  ['John.Alison@cern.ch', 'anthony.morley@cern.ch']),
186  (mf.pv_PrimaryVertexMonitoring_calcResoAndEfficiency,
187  ['Andreas.Wildauer@cern.ch']),
188  (mf.VxMon_move,
189  ['federico.meloni@cern.ch']),
190  (rpc_create,
191  ['michele.bianco@le.infn.it', 'monica.verducci@cern.ch']),
192  (mf.RPCPostProcess,
193  ['michele.bianco@le.infn.it', 'monica.verducci@cern.ch']),
194  (mdt_create,
195  ['john.stakely.keller@cern.ch', 'monica.verducci@cern.ch']),
196  (mf.TGCPostProcess,
197  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp']),
198  (mf.MDTvsTGCPostProcess,
199  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp']),
200  (mf.HLTMuonPostProcess,
201  ['lyuan@ihep.ac.cn', 'kingmgl@stu.kobe-u.ac.jp', 'yamazaki@phys.sci.kobe-u.ac.jp']),
202  (mf.HLTEgammaPostProcess,
203  ['yan.jie.schnellbach@cern.ch', 'alessandro.tricoli@cern.ch']),
204  (mf.HLTTauPostProcess,
205  ['Cristobal.Cuenca@cern.ch', 'Geng-Yuan.Jeng@cern.ch', 'Giovanna.Cottin@cern.ch']),
206  (mf.HLTMETPostProcess,
207  ['venkat.kaushik@cern.ch']),
208  (mf.HLTCaloPostProcess,
209  ['gareth.brown@cern.ch']),
210  (mf.HLTJetPostProcess,
211  ['venkat.kaushik@cern.ch']),
212  (mf.BJetTaggingPostProcess,
213  ['m.neumann@cern.ch']),
214  (mf.L1CaloPostProcess,
215  ['ivana.hristova@cern.ch', 'pjwf@hep.ph.bham.ac.uk']),
216  (mf.PixelPostProcess,
217  ['daiki.yamaguchi@cern.ch']),
218  (mf.MuonTrackPostProcess,
219  ['baojia.tong@cern.ch', 'alexander.tuna@cern.ch']),
220  (zlumi,
221  ['ponyisi@utexas.edu']),
222  ]
223 
224  funclist += make_newstyle_funcs()
225 
226  # determine how we react to a failure
227  isTesting = bool(os.environ.get('DQ_POSTPROCESS_ERROR_ON_FAILURE', False))
228 
229  # first try all at once
230  def go_all(fname, isIncremental):
231  for funcinfo in funclist:
232  funcinfo[0](fname, isIncremental)
233 
234  success = _ProtectPostProcessing( (go_all, []), outFileName, isIncremental )
235 
236  if not success:
237  if isTesting:
238  sys.exit(1)
239  else:
240  for funcinfo in funclist:
241  _ProtectPostProcessing( funcinfo, outFileName, isIncremental )
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:116
dqutils::MonitoringFile
Definition: MonitoringFile.h:54
python.DQPostProcessMod.DQPostProcess
def DQPostProcess(outFileName, isIncremental=False)
Definition: DQPostProcessMod.py:127
python.DQPostProcessMod._ProtectPostProcessing
def _ProtectPostProcessing(funcinfo, outFileName, isIncremental)
Definition: DQPostProcessMod.py:58
python.createRpcFolders.createRPCConditionDB
def createRPCConditionDB()
Definition: createRpcFolders.py:52
python.createMdtFolders.createMDTConditionDBDead
def createMDTConditionDBDead()
Definition: createMdtFolders.py:10
python.DQPostProcessMod._dolsrwrapper
def _dolsrwrapper(fname)
Definition: DQPostProcessMod.py:9
python._resolve_data_path.resolve_data_path
def resolve_data_path(fin)
Definition: DataQualityConfigurations/python/_resolve_data_path.py:6
dqt_zlumi_display_z_rate.zlumi
zlumi
Definition: dqt_zlumi_display_z_rate.py:72
python.doZLumi.go
def go(fname)
Definition: doZLumi.py:78
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.createRpcFolders.createRPCDQMFDB
def createRPCDQMFDB()
Definition: createRpcFolders.py:11
DQPostProcessTest.mdt_create
def mdt_create(dummy)
Definition: DQPostProcessTest.py:27
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
python.DQPostProcessMod._dolsr
def _dolsr(dir)
Definition: DQPostProcessMod.py:21
str
Definition: BTagTrackIpAccessor.cxx:11
DQPostProcessTest.rpc_create
def rpc_create(dummy)
Definition: DQPostProcessTest.py:21
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60
python.createMdtFolders.createMDTConditionDBNoisy
def createMDTConditionDBNoisy()
Definition: createMdtFolders.py:55