ATLAS Offline Software
DQPostProcessingAlg.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 # from AthenaPython.PyAthenaComps import StatusCode
4 from AthenaPython import PyAthena
5 
6 import histgrinder
7 import histgrinder.interfaces
8 from histgrinder.HistObject import HistObject
9 from typing import Union, Any, Pattern, Dict, Optional
10 from collections.abc import Iterable, Iterator, Mapping, Collection, Generator
11 
12 StatusCode = PyAthena.StatusCode
13 
15  def __init__(self, name=None, **kw):
16  super(DQPostProcessingAlg, self).__init__(name, **kw)
17  self.Interval = 1
18  self.FileKey = '/CombinedMonitoring/run_%(run)s/'
19  self.ConfigFiles = None
21  self._ctr = 0
22  self._run = 0
23  self._transformermap = {}
24  self._timings = {}
25  self._plotnames = set()
26  self.DoTiming = True
27  self.DoEntryOptimization = True
28 
29  def initialize(self):
30  from histgrinder.config import read_configuration
31  from histgrinder.transform import Transformer
32  import os, glob
33  from DataQualityUtils._resolve_data_path import resolve_data_path
34 
35  self.hsvc = PyAthena.py_svc('THistSvc')
36 
37  dpath = resolve_data_path("DataQualityUtils")
38  if dpath is None:
39  self.msg.error("Unable to resolve DataQualityUtils data path, not running new-style postprocessing")
40  return StatusCode.Failure
41 
42  # read configuration & set up transformations
43  self._transformers = []
44  if self.ConfigFiles is None:
45  postprocfiles = glob.glob(os.path.join(dpath,'postprocessing/*.yaml'))
46  else:
47  postprocfiles = self.ConfigFiles
48  self.msg.info(f'The postprocessing config file list is {postprocfiles}')
49  for configfile in postprocfiles:
50  config = read_configuration(configfile)
51  self._transformers += [Transformer(_) for _ in config if _.description not in self.FunctionsToDisable]
52  selectors = set()
53  for transform in self._transformers:
54  selectors.update(transform.inregexes)
55  if self.DoTiming:
56  self._timings[transform] = 0.
57 
58  # Configure input
60  in_configuration: Mapping[str, Any] = {'source': self}
61  # if args.prefix:
62  in_configuration['prefix'] = f'{self.FileKey}'
63  self._im.configure(in_configuration)
64  self._im.setSelectors(selectors)
65 
66  # Configure output
68  out_configuration: Mapping[str, Any] = {'target': self}
69  # if args.prefix:
70  out_configuration['prefix'] = f'{self.FileKey}'
71  self._om.configure(out_configuration)
72  return StatusCode.Success
73 
74  def _process(self):
75  import time
76  for obj in self._im:
77  self.msg.debug(f'now processing for {obj.name}')
78  # if we are still accumulating new plots then transformer cache is possibly invalid
79  translist = self._transformermap.get(obj.name, self._transformers)
80  needtocache = (translist == self._transformers)
81  cached = []
82  for _ in translist:
83  self.msg.debug(f'consider transformer {_.tc.description}')
84  if self.DoTiming:
85  t0 = time.perf_counter()
86  if needtocache:
87  if any(_.match(obj.name) for _ in _.inregexes):
88  cached.append(_)
89  v = _.consider(obj, defer=True) # need to find out if it produces output
90  if self.DoTiming:
91  t = time.perf_counter()-t0
92  self._timings[_] += t
93  if v:
94  self.msg.debug('Match made!')
95  self._om.publish(v)
96  if needtocache:
97  self._transformermap[obj.name] = cached
98  # Process deferred transformations
99  for _ in self._transformers:
100  try:
101  lv = _.transform()
102  for v in lv:
103  self._om.publish(v)
104  except Exception as e:
105  self.msg.info(f'Exception running transformer {_.tc.description}: {e}')
106  self._om.finalize()
107 
108  def execute(self):
109  self._ctr += 1
110  if ((self._ctr - 1) % self.Interval) != 0: return StatusCode.Success
111  self._run = self.evtStore['EventInfo'].runNumber()
112  try:
113  self._process()
114  except Exception as e:
115  import traceback
116  self.msg.info(f"Caught exception: {e}")
117  self.msg.info(traceback.format_exc())
118  self.msg.debug("I've got it made")
119  return StatusCode.Success
120 
121  def finalize(self):
122  self.msg.debug("Finalizing")
123  if self.DoTiming:
124  self.msg.info('Timings')
125  for k, v in self._timings.items():
126  self.msg.info(f'{k.tc.description}, {v}')
127  try:
128  self._process()
129  except Exception as e:
130  import traceback
131  self.msg.info(f"Caught transformation exception: {e}")
132  self.msg.warning(traceback.format_exc())
133  return StatusCode.Success
134 
135 class AthInputModule(histgrinder.interfaces.InputModule):
136  def __init__(self):
137  self.source = None
139  self.selectors = None
140  self.entries = {}
141 
142  def configure(self, options: Mapping[str, Any]) -> None:
143  """
144  Configure this module. Potential elements of "options":
145  source: should be a ROOT-openable filename or URL.
146  prefix: directory path to search under. Returned histogram names
147  will not include this.
148  """
149  if 'source' not in options:
150  raise ValueError("Must specify 'source' as an "
151  "option to AthInputModule")
152  self.source = options['source']
153  self.prefix = options.get('prefix', '/')
154  self.cachednames = set()
155  self.matchednames = {}
156 
157  def setSelectors(self, selectors: Collection[Pattern]) -> None:
158  """ Do more later """
159  self.selectors = selectors
160 
161  def _getklass(self, k):
162  import ROOT
163  hsvc = self.source.hsvc
164  if hsvc.existsHist(k):
165  hptr = ROOT.MakeNullPointer(ROOT.TH1)
166  if hsvc.getHist(k, hptr).isSuccess():
167  klass = getattr(ROOT, hptr.ClassName())
168  return klass
169  return None
170 
171  def iterate(self, dryrun) -> Generator[HistObject, None, None]:
172  """ Iterate over all histograms in THistSvc """
173  import ROOT
174  log = self.source.msg
175  specprefix = self.prefix % { 'run': self.source._run }
176  log.debug(f'Would like to match {specprefix}')
177  hsvc = self.source.hsvc
178 
179  # check if we have new histograms; if so, check against selectors to see if we're interested
180  # weirdly unpythonic code to avoid memory leak in iterators pre-ROOT 6.26
181  histnames = hsvc.getHists()
182  currenthists = set(str(histnames[_]) for _ in range(len(histnames)))
183  for k in currenthists - self.cachednames:
184  # log.info(f'We have ... ? {k}')
185  if not k.startswith(specprefix):
186  continue
187  shortk = k.replace(specprefix, '', 1)
188  if self.selectors is not None:
189  if not any(_.match(shortk) for _ in self.selectors):
190  continue
191  self.matchednames[k] = None
192  self.cachednames.update(currenthists)
193  log.debug(f'We now have {len(self.cachednames)} entries in our cache, of {len(currenthists)} total plots')
194  log.debug(f'There are {len(self.matchednames)} matches to be considered')
195 
196  # postprocess only matched histograms
197  for k, klass in self.matchednames.items():
198  if dryrun:
199  yield HistObject(k.replace(specprefix, '', 1), None)
200 
201  log.debug(f'THistSvc input trying to read {k}')
202  if klass is None:
203  klass = self._getklass(k)
204  self.matchednames[k] = klass
205  hptr = ROOT.MakeNullPointer(klass)
206  if hsvc.getHist(k, hptr).isSuccess():
207  log.debug(f'THistSvc input read {k} as {type(hptr)}')
208  obj = hptr
209  ROOT.SetOwnership(obj, False) # no NOT attempt to GC histograms read from THistSvc
210  if k in self.entries:
211  if obj.GetEntries() == self.entries[k]:
212  continue
213  self.entries[k] = obj.GetEntries()
214  yield HistObject(k.replace(specprefix, '', 1), obj)
215  else:
216  log.error(f'Cannot read {k}')
217 
218  log.debug('Done on input side')
219 
220  def __iter__(self) -> Iterator[HistObject]:
221  return self.iterate(dryrun=False)
222 
223  def warmup(self) -> Iterable[HistObject]:
224  return self.iterate(dryrun=True)
225 
226 class AthOutputModule(histgrinder.interfaces.OutputModule):
227  def __init__(self):
228  self.target = None
229 
230  def configure(self, options: Mapping[str, Any]) -> None:
231  """
232  Configure this module. Potential elements of "options":
233  target: should be a ROOT-openable filename or URL which
234  can be opened for writing.
235  prefix: directory path to place results under.
236  overwrite: boolean to indicate whether results should overwrite
237  existing histograms in the file.
238  delay: only write histograms in finalize() (not during publish()).
239  """
240  if 'target' not in options:
241  raise ValueError("Must specify 'target' as an option "
242  "to AthInputModule")
243  self.target = options['target']
244  self.overwrite = bool(options.get('overwrite', True))
245  self.prefix = options.get('prefix', '/')
246  self.delay = bool(options.get('delay', True))
247  self.queue: Optional[Dict[str, HistObject]] = {}
248  self._hsvc_funcs = {'hist': { 'exists': self.target.hsvc.existsHist,
249  'get': self.target.hsvc.getHist,
250  'reg': self.target.hsvc._cpp_regHist },
251  'graph': { 'exists': self.target.hsvc.existsGraph,
252  'get': self.target.hsvc.getGraph,
253  'reg': self.target.hsvc._cpp_regGraph },
254  'eff': { 'exists': self.target.hsvc.existsEfficiency,
255  'get': self.target.hsvc.getEfficiency,
256  'reg': self.target.hsvc._cpp_regEfficiency },
257  }
258 
259  def publish(self, obj: Union[HistObject, Iterable[HistObject]]) -> None:
260  """ Accepts a HistObject containing a ROOT object to write to file """
261  if isinstance(obj, HistObject):
262  obj = [obj]
263  d_obj = { _.name: _ for _ in obj }
264  if self.delay:
265  if not self.queue:
266  self.queue = d_obj
267  else:
268  self.queue.update(d_obj)
269  else:
270  self.queue = d_obj
271  self._write()
272  self.queue = None
273 
274  def _write(self) -> None:
275  """ write obj to THistSvc """
276  import ROOT
277  import os.path
278  if not self.queue:
279  return # Nothing to do
280  log = self.target.msg
281  hsvc = self.target.hsvc
282  for _, o in self.queue.items():
283  fulltargetname = os.path.join(self.prefix, o.name) % { 'run': self.target._run }
284  log.debug(f"Attempt to publish {fulltargetname} of type {type(o.hist)}")
285  # it would be perverse if the type of the postprocessing output changed
286  # between invocations and we will not consider it
287  if isinstance(o.hist, ROOT.TH1):
288  funcs = self._hsvc_funcs['hist']
289  parenttype = ROOT.TH1
290  elif isinstance(o.hist, ROOT.TGraph):
291  funcs = self._hsvc_funcs['graph']
292  parenttype = ROOT.TGraph
293  elif isinstance(o.hist, ROOT.TEfficiency):
294  funcs = self._hsvc_funcs['eff']
295  parenttype = ROOT.TEfficiency
296  else:
297  log.warning(f'Do not know how to handle object {fulltargetname} of type {type(o.hist)}; skipping')
298  continue
299  o.hist.SetName(os.path.basename(fulltargetname))
300  if funcs['exists'](fulltargetname):
301  # following kind of silly procedure is necessary to avoid memory leaks
302  hptr = ROOT.MakeNullPointer(parenttype)
303  if funcs['get'](fulltargetname, hptr).isSuccess():
304  hsvc.deReg(hptr)
305  ROOT.SetOwnership(hptr, True) # clean up the histogram from our side
306  if not funcs['reg'](fulltargetname, o.hist).isSuccess():
307  log.error(f"Unable to register {fulltargetname}")
308  else:
309  ROOT.SetOwnership(o.hist, False)
310  log.debug("Published")
311  self.queue.clear()
312 
313  def finalize(self) -> None:
314  """ Outputs outstanding HistObjects """
315  self._write()
grepfile.info
info
Definition: grepfile.py:38
python.DQPostProcessingAlg.AthInputModule.classwarnings
classwarnings
Definition: DQPostProcessingAlg.py:138
python.DQPostProcessingAlg.AthInputModule.__init__
def __init__(self)
Definition: DQPostProcessingAlg.py:136
python.DQPostProcessingAlg.AthInputModule.setSelectors
None setSelectors(self, Collection[Pattern] selectors)
Definition: DQPostProcessingAlg.py:157
python.DQPostProcessingAlg.DQPostProcessingAlg._plotnames
_plotnames
Definition: DQPostProcessingAlg.py:25
python.DQPostProcessingAlg.AthOutputModule.publish
None publish(self, Union[HistObject, Iterable[HistObject]] obj)
Definition: DQPostProcessingAlg.py:259
python.DQPostProcessingAlg.AthInputModule.cachednames
cachednames
Definition: DQPostProcessingAlg.py:154
configure
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
Definition: TrigGlobEffCorrValidation.cxx:514
python.DQPostProcessingAlg.AthInputModule.entries
entries
Definition: DQPostProcessingAlg.py:140
python.DQPostProcessingAlg.AthOutputModule.configure
None configure(self, Mapping[str, Any] options)
Definition: DQPostProcessingAlg.py:230
python.DQPostProcessingAlg.AthOutputModule.queue
queue
Definition: DQPostProcessingAlg.py:266
PyAthena::Alg::initialize
virtual StatusCode initialize() override
Definition: PyAthenaAlg.cxx:60
python.DQPostProcessingAlg.DQPostProcessingAlg.DoTiming
DoTiming
Definition: DQPostProcessingAlg.py:26
PyAthena::Alg::execute
virtual StatusCode execute() override
Definition: PyAthenaAlg.cxx:93
python.DQPostProcessingAlg.DQPostProcessingAlg._timings
_timings
Definition: DQPostProcessingAlg.py:24
python.DQPostProcessingAlg.DQPostProcessingAlg.FunctionsToDisable
FunctionsToDisable
Definition: DQPostProcessingAlg.py:20
python.DQPostProcessingAlg.AthOutputModule
Definition: DQPostProcessingAlg.py:226
python.DQPostProcessingAlg.AthInputModule.matchednames
matchednames
Definition: DQPostProcessingAlg.py:155
python.DQPostProcessingAlg.DQPostProcessingAlg._transformers
_transformers
Definition: DQPostProcessingAlg.py:43
python.DQPostProcessingAlg.AthInputModule.prefix
prefix
Definition: DQPostProcessingAlg.py:153
python.DQPostProcessingAlg.AthInputModule.iterate
Generator[HistObject, None, None] iterate(self, dryrun)
Definition: DQPostProcessingAlg.py:171
python.DQPostProcessingAlg.AthOutputModule.target
target
Definition: DQPostProcessingAlg.py:228
python.DQPostProcessingAlg.DQPostProcessingAlg.DoEntryOptimization
DoEntryOptimization
Definition: DQPostProcessingAlg.py:27
PyAthena::Alg::finalize
virtual StatusCode finalize() override
Definition: PyAthenaAlg.cxx:86
python.DQPostProcessingAlg.AthInputModule.__iter__
Iterator[HistObject] __iter__(self)
Definition: DQPostProcessingAlg.py:220
python.DQPostProcessingAlg.DQPostProcessingAlg._im
_im
Definition: DQPostProcessingAlg.py:59
AthCommonDataStore< AthCommonMsg< Algorithm > >::evtStore
ServiceHandle< StoreGateSvc > & evtStore()
The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:85
python.DQPostProcessingAlg.DQPostProcessingAlg._om
_om
Definition: DQPostProcessingAlg.py:67
python.DQPostProcessingAlg.DQPostProcessingAlg.FileKey
FileKey
Definition: DQPostProcessingAlg.py:18
python.DQPostProcessingAlg.DQPostProcessingAlg.__init__
def __init__(self, name=None, **kw)
Definition: DQPostProcessingAlg.py:15
python.DQPostProcessingAlg.DQPostProcessingAlg._transformermap
_transformermap
Definition: DQPostProcessingAlg.py:23
python.DQPostProcessingAlg.AthOutputModule.overwrite
overwrite
Definition: DQPostProcessingAlg.py:244
python._resolve_data_path.resolve_data_path
def resolve_data_path(fin)
Definition: DataQualityConfigurations/python/_resolve_data_path.py:6
python.DQPostProcessingAlg.DQPostProcessingAlg
Definition: DQPostProcessingAlg.py:14
python.DQPostProcessingAlg.AthInputModule.selectors
selectors
Definition: DQPostProcessingAlg.py:139
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.DQPostProcessingAlg.AthInputModule.source
source
Definition: DQPostProcessingAlg.py:137
python.DQPostProcessingAlg.AthInputModule.warmup
Iterable[HistObject] warmup(self)
Definition: DQPostProcessingAlg.py:223
python.DQPostProcessingAlg.AthOutputModule._write
None _write(self)
Definition: DQPostProcessingAlg.py:274
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.DQPostProcessingAlg.AthInputModule.configure
None configure(self, Mapping[str, Any] options)
Definition: DQPostProcessingAlg.py:142
debug
const bool debug
Definition: MakeUncertaintyPlots.cxx:53
python.DQPostProcessingAlg.DQPostProcessingAlg.Interval
Interval
Definition: DQPostProcessingAlg.py:17
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.DQPostProcessingAlg.AthInputModule._getklass
def _getklass(self, k)
Definition: DQPostProcessingAlg.py:161
python.DQPostProcessingAlg.AthInputModule
Definition: DQPostProcessingAlg.py:135
python.DQPostProcessingAlg.DQPostProcessingAlg._run
_run
Definition: DQPostProcessingAlg.py:22
python.DQPostProcessingAlg.DQPostProcessingAlg.hsvc
hsvc
Definition: DQPostProcessingAlg.py:35
DeMoAtlasDataLoss.runNumber
string runNumber
Definition: DeMoAtlasDataLoss.py:64
VKalVrtAthena::varHolder_detail::clear
void clear(T &var)
Definition: NtupleVars.h:48
AthCommonMsg< Algorithm >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
python.DQPostProcessingAlg.AthOutputModule.__init__
def __init__(self)
Definition: DQPostProcessingAlg.py:227
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
python.DQPostProcessingAlg.AthOutputModule.finalize
None finalize(self)
Definition: DQPostProcessingAlg.py:313
python.DQPostProcessingAlg.DQPostProcessingAlg._ctr
_ctr
Definition: DQPostProcessingAlg.py:21
str
Definition: BTagTrackIpAccessor.cxx:11
PyAthena::Alg
Definition: PyAthenaAlg.h:33
python.DQPostProcessingAlg.AthOutputModule.prefix
prefix
Definition: DQPostProcessingAlg.py:245
python.DQPostProcessingAlg.DQPostProcessingAlg._process
def _process(self)
Definition: DQPostProcessingAlg.py:74
python.DQPostProcessingAlg.AthOutputModule._hsvc_funcs
_hsvc_funcs
Definition: DQPostProcessingAlg.py:248
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60
error
Definition: IImpactPoint3dEstimator.h:70
python.DQPostProcessingAlg.DQPostProcessingAlg.ConfigFiles
ConfigFiles
Definition: DQPostProcessingAlg.py:19
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
python.DQPostProcessingAlg.AthOutputModule.delay
delay
Definition: DQPostProcessingAlg.py:246