ATLAS Offline Software
Loading...
Searching...
No Matches
DQPostProcessingAlg.py
Go to the documentation of this file.
1# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2
3# from AthenaPython.PyAthenaComps import StatusCode
4from AthenaPython import PyAthena
5
6import histgrinder
8from histgrinder.HistObject import HistObject
9from typing import Union, Any, Pattern, Dict, Optional
10from collections.abc import Iterable, Iterator, Mapping, Collection, Generator
11
12StatusCode = PyAthena.StatusCode
13
15 def __init__(self, name=None, **kw):
16 super(DQPostProcessingAlg, self).__init__(name, **kw)
17 self.Interval = 1
18 self.FileKey = '/CombinedMonitoring/run_%(run)s/'
19 self.ConfigFiles = None
21 self._ctr = 0
22 self._run = 0
24 self._timings = {}
25 self._plotnames = set()
26 self.DoTiming = True
28
29 def initialize(self):
30 from histgrinder.config import read_configuration
31 from histgrinder.transform import Transformer
32 import os, glob
33 from DataQualityUtils._resolve_data_path import resolve_data_path
34
35 self.hsvc = PyAthena.py_svc('THistSvc')
36
37 dpath = resolve_data_path("DataQualityUtils")
38 if dpath is None:
39 self.msg.error("Unable to resolve DataQualityUtils data path, not running new-style postprocessing")
40 return StatusCode.Failure
41
42 # read configuration & set up transformations
43 self._transformers = []
44 if self.ConfigFiles is None:
45 postprocfiles = glob.glob(os.path.join(dpath,'postprocessing/*.yaml'))
46 else:
47 postprocfiles = self.ConfigFiles
48 self.msg.info(f'The postprocessing config file list is {postprocfiles}')
49 for configfile in postprocfiles:
50 config = read_configuration(configfile)
51 self._transformers += [Transformer(_) for _ in config if _.description not in self.FunctionsToDisable]
52 selectors = set()
53 for transform in self._transformers:
54 selectors.update(transform.inregexes)
55 if self.DoTiming:
56 self._timings[transform] = 0.
57
58 # Configure input
60 in_configuration: Mapping[str, Any] = {'source': self}
61 # if args.prefix:
62 in_configuration['prefix'] = f'{self.FileKey}'
63 self._im.configure(in_configuration)
64 self._im.setSelectors(selectors)
65
66 # Configure output
68 out_configuration: Mapping[str, Any] = {'target': self}
69 # if args.prefix:
70 out_configuration['prefix'] = f'{self.FileKey}'
71 self._om.configure(out_configuration)
72 return StatusCode.Success
73
74 def _process(self):
75 import time
76 for obj in self._im:
77 self.msg.debug(f'now processing for {obj.name}')
78 # if we are still accumulating new plots then transformer cache is possibly invalid
79 translist = self._transformermap.get(obj.name, self._transformers)
80 needtocache = (translist == self._transformers)
81 cached = []
82 for _ in translist:
83 self.msg.debug(f'consider transformer {_.tc.description}')
84 if self.DoTiming:
85 t0 = time.perf_counter()
86 if needtocache:
87 if any(_.match(obj.name) for _ in _.inregexes):
88 cached.append(_)
89 v = _.consider(obj, defer=True) # need to find out if it produces output
90 if self.DoTiming:
91 t = time.perf_counter()-t0
92 self._timings[_] += t
93 if v:
94 self.msg.debug('Match made!')
95 self._om.publish(v)
96 if needtocache:
97 self._transformermap[obj.name] = cached
98 # Process deferred transformations
99 for _ in self._transformers:
100 try:
101 lv = _.transform()
102 for v in lv:
103 self._om.publish(v)
104 except Exception as e:
105 self.msg.info(f'Exception running transformer {_.tc.description}: {e}')
106 self._om.finalize()
107
108 def execute(self):
109 self._ctr += 1
110 if ((self._ctr - 1) % self.Interval) != 0: return StatusCode.Success
111 self._run = self.evtStore['EventInfo'].runNumber()
112 try:
113 self._process()
114 except Exception as e:
115 import traceback
116 self.msg.info(f"Caught exception: {e}")
117 self.msg.info(traceback.format_exc())
118 self.msg.debug("I've got it made")
119 return StatusCode.Success
120
121 def finalize(self):
122 self.msg.debug("Finalizing")
123 if self.DoTiming:
124 self.msg.info('Timings')
125 for k, v in self._timings.items():
126 self.msg.info(f'{k.tc.description}, {v}')
127 try:
128 self._process()
129 except Exception as e:
130 import traceback
131 self.msg.info(f"Caught transformation exception: {e}")
132 self.msg.warning(traceback.format_exc())
133 return StatusCode.Success
134
135class AthInputModule(histgrinder.interfaces.InputModule):
136 def __init__(self):
137 self.source = None
139 self.selectors = None
140 self.entries = {}
141
142 def configure(self, options: Mapping[str, Any]) -> None:
143 """
144 Configure this module. Potential elements of "options":
145 source: should be a ROOT-openable filename or URL.
146 prefix: directory path to search under. Returned histogram names
147 will not include this.
148 """
149 if 'source' not in options:
150 raise ValueError("Must specify 'source' as an "
151 "option to AthInputModule")
152 self.source = options['source']
153 self.prefix = options.get('prefix', '/')
155 self.matchednames = {}
156
157 def setSelectors(self, selectors: Collection[Pattern]) -> None:
158 """ Do more later """
159 self.selectors = selectors
160
161 def _getklass(self, k):
162 import ROOT
163 hsvc = self.source.hsvc
164 if hsvc.existsHist(k):
165 hptr = ROOT.MakeNullPointer(ROOT.TH1)
166 if hsvc.getHist(k, hptr).isSuccess():
167 klass = getattr(ROOT, hptr.ClassName())
168 return klass
169 return None
170
171 def iterate(self, dryrun) -> Generator[HistObject, None, None]:
172 """ Iterate over all histograms in THistSvc """
173 import ROOT
174 log = self.source.msg
175 specprefix = self.prefix % { 'run': self.source._run }
176 log.debug(f'Would like to match {specprefix}')
177 hsvc = self.source.hsvc
178
179 # check if we have new histograms; if so, check against selectors to see if we're interested
180 # weirdly unpythonic code to avoid memory leak in iterators pre-ROOT 6.26
181 histnames = hsvc.getHists()
182 currenthists = set(str(histnames[_]) for _ in range(len(histnames)))
183 for k in currenthists - self.cachednames:
184 # log.info(f'We have ... ? {k}')
185 if not k.startswith(specprefix):
186 continue
187 shortk = k.replace(specprefix, '', 1)
188 if self.selectors is not None:
189 if not any(_.match(shortk) for _ in self.selectors):
190 continue
191 self.matchednames[k] = None
192 self.cachednames.update(currenthists)
193 log.debug(f'We now have {len(self.cachednames)} entries in our cache, of {len(currenthists)} total plots')
194 log.debug(f'There are {len(self.matchednames)} matches to be considered')
195
196 # postprocess only matched histograms
197 for k, klass in self.matchednames.items():
198 if dryrun:
199 yield HistObject(k.replace(specprefix, '', 1), None)
200
201 log.debug(f'THistSvc input trying to read {k}')
202 if klass is None:
203 klass = self._getklass(k)
204 self.matchednames[k] = klass
205 hptr = ROOT.MakeNullPointer(klass)
206 if hsvc.getHist(k, hptr).isSuccess():
207 log.debug(f'THistSvc input read {k} as {type(hptr)}')
208 obj = hptr
209 ROOT.SetOwnership(obj, False) # no NOT attempt to GC histograms read from THistSvc
210 if k in self.entries:
211 if obj.GetEntries() == self.entries[k]:
212 continue
213 self.entries[k] = obj.GetEntries()
214 yield HistObject(k.replace(specprefix, '', 1), obj)
215 else:
216 log.error(f'Cannot read {k}')
217
218 log.debug('Done on input side')
219
220 def __iter__(self) -> Iterator[HistObject]:
221 return self.iterate(dryrun=False)
222
223 def warmup(self) -> Iterable[HistObject]:
224 return self.iterate(dryrun=True)
225
226class AthOutputModule(histgrinder.interfaces.OutputModule):
227 def __init__(self):
228 self.target = None
229
230 def configure(self, options: Mapping[str, Any]) -> None:
231 """
232 Configure this module. Potential elements of "options":
233 target: should be a ROOT-openable filename or URL which
234 can be opened for writing.
235 prefix: directory path to place results under.
236 overwrite: boolean to indicate whether results should overwrite
237 existing histograms in the file.
238 delay: only write histograms in finalize() (not during publish()).
239 """
240 if 'target' not in options:
241 raise ValueError("Must specify 'target' as an option "
242 "to AthInputModule")
243 self.target = options['target']
244 self.overwrite = bool(options.get('overwrite', True))
245 self.prefix = options.get('prefix', '/')
246 self.delay = bool(options.get('delay', True))
247 self.queue: Optional[Dict[str, HistObject]] = {}
248 self._hsvc_funcs = {'hist': { 'exists': self.target.hsvc.existsHist,
249 'get': self.target.hsvc.getHist,
250 'reg': self.target.hsvc._cpp_regHist },
251 'graph': { 'exists': self.target.hsvc.existsGraph,
252 'get': self.target.hsvc.getGraph,
253 'reg': self.target.hsvc._cpp_regGraph },
254 'eff': { 'exists': self.target.hsvc.existsEfficiency,
255 'get': self.target.hsvc.getEfficiency,
256 'reg': self.target.hsvc._cpp_regEfficiency },
257 }
258
259 def publish(self, obj: Union[HistObject, Iterable[HistObject]]) -> None:
260 """ Accepts a HistObject containing a ROOT object to write to file """
261 if isinstance(obj, HistObject):
262 obj = [obj]
263 d_obj = { _.name: _ for _ in obj }
264 if self.delay:
265 if not self.queue:
266 self.queue = d_obj
267 else:
268 self.queue.update(d_obj)
269 else:
270 self.queue = d_obj
271 self._write()
272 self.queue = None
273
274 def _write(self) -> None:
275 """ write obj to THistSvc """
276 import ROOT
277 import os.path
278 if not self.queue:
279 return # Nothing to do
280 log = self.target.msg
281 hsvc = self.target.hsvc
282 for _, o in self.queue.items():
283 fulltargetname = os.path.join(self.prefix, o.name) % { 'run': self.target._run }
284 log.debug(f"Attempt to publish {fulltargetname} of type {type(o.hist)}")
285 # it would be perverse if the type of the postprocessing output changed
286 # between invocations and we will not consider it
287 if isinstance(o.hist, ROOT.TH1):
288 funcs = self._hsvc_funcs['hist']
289 parenttype = ROOT.TH1
290 elif isinstance(o.hist, ROOT.TGraph):
291 funcs = self._hsvc_funcs['graph']
292 parenttype = ROOT.TGraph
293 elif isinstance(o.hist, ROOT.TEfficiency):
294 funcs = self._hsvc_funcs['eff']
295 parenttype = ROOT.TEfficiency
296 else:
297 log.warning(f'Do not know how to handle object {fulltargetname} of type {type(o.hist)}; skipping')
298 continue
299 o.hist.SetName(os.path.basename(fulltargetname))
300 if funcs['exists'](fulltargetname):
301 # following kind of silly procedure is necessary to avoid memory leaks
302 hptr = ROOT.MakeNullPointer(parenttype)
303 if funcs['get'](fulltargetname, hptr).isSuccess():
304 hsvc.deReg(hptr)
305 ROOT.SetOwnership(hptr, True) # clean up the histogram from our side
306 if not funcs['reg'](fulltargetname, o.hist).isSuccess():
307 log.error(f"Unable to register {fulltargetname}")
308 else:
309 ROOT.SetOwnership(o.hist, False)
310 log.debug("Published")
311 self.queue.clear()
312
313 def finalize(self) -> None:
314 """ Outputs outstanding HistObjects """
315 self._write()
const bool debug
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
MsgStream & msg() const
virtual StatusCode execute() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
None setSelectors(self, Collection[Pattern] selectors)
Generator[HistObject, None, None] iterate(self, dryrun)
None configure(self, Mapping[str, Any] options)
None configure(self, Mapping[str, Any] options)
None publish(self, Union[HistObject, Iterable[HistObject]] obj)
STL class.
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130