ATLAS Offline Software
atlas_oh.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 from histgrinder.interfaces import InputModule, OutputModule
4 from histgrinder.HistObject import HistObject
5 from typing import (Union, Iterable, Mapping, Any, Collection,
6  Pattern, Generator)
7 import logging
8 import oh
9 
10 
11 def readobj(part, server, provider, k, dryrun):
12  import oh
13  if not dryrun:
14  obj = oh.getRootObject(part, server, provider, k)
15  else:
16  obj = None
17  return obj
18 
19 class Receiver(oh.OHRootReceiver):
20  def __init__(self, q, prefix):
21  super(Receiver, self).__init__()
22  self.queue = q
23  self.prefix = prefix
24 
25  def receive(self, obj):
26  name = obj.GetName().replace(self.prefix, '')
27  try:
28  self.queue.append(HistObject(name, obj))
29  except Exception as e:
30  print(e)
31 
32 class OHInputModule(InputModule):
33  def __init__(self):
34  self.source = None
35  self.classwarnings = set()
36  self.selectors = None
37 
38  def configure(self, options: Mapping[str, Any]) -> None:
39  """
40  Configure this module. Potential elements of "options":
41  source: a string in the format partition/server
42  prefix: string to prepend to histogram names.
43  """
44  from ispy import IPCPartition
45  if 'source' not in options:
46  raise ValueError("Must specify 'source' as an "
47  "option to OHInputModule")
48  self.source = options['source']
49  self.partition, self.server, self.provider = self.source.split(';')
50  self.prefix = options.get('prefix', '')
51  if not IPCPartition(self.partition).isValid():
52  raise ValueError(f'Input partition {self.partition} does not exist')
53  log = logging.getLogger(self.__class__.__name__)
54  log.info(f'Using source {self.source}')
55  self.interval = options.get('interval', 5)
56  log.info(f'Check interval is {self.interval} seconds')
57 
58  def setSelectors(self, selectors: Collection[Pattern]) -> None:
59  """ Do more later """
60  self.selectors = selectors
61 
62  def iterate(self, dryrun) -> Generator[HistObject, None, None]:
63  """ Connect to OH; iterate over matching histograms """
64  import ROOT
65  ROOT.TH1.AddDirectory(ROOT.kFALSE)
66  from oh import OHSubscriber
67  from ispy import IPCPartition, InfoReader
68  import time
69  from collections import deque
70  log = logging.getLogger(self.__class__.__name__)
71  opartition = IPCPartition(self.partition)
72 
73  # Set up two reading methods: a subscriber to new histograms and
74  # a first pass over all histograms.
75  queue = deque()
76  rec = Receiver(queue, f'{self.server}.{self.provider}.{self.prefix}')
77  subs = OHSubscriber(opartition, self.server, rec, True)
78  subs.subscribe_re2(self.server, self.provider, '.*', True)
79 
80  reader = InfoReader(opartition, self.server, rf'{self.provider}\..*')
81  reader.update()
82  log.debug('First pass update')
83  for k in reader.objects:
84  toread = k.replace(f'{self.server}.{self.provider}.','')
85  obj = readobj(opartition, self.server, self.provider, toread, dryrun)
86  log.debug('OH input read '
87  f'{k} as {toread.replace(self.prefix, "")}')
88  yield HistObject(toread.replace(self.prefix, '', 1), obj)
89 
90  if not dryrun:
91  log.info('First pass complete, entering update loop')
92  del reader
93  try:
94  while True:
95  try:
96  time.sleep(self.interval)
97  if len(queue) > 0:
98  log.info('Update detected')
99  while True:
100  yield queue.popleft()
101  except IndexError:
102  pass
103  log.debug('Completed update')
104  finally:
105  subs.unsubscribe_re2(self.server, self.provider, '.*')
106 
107  return
108  yield
109 
110  def __iter__(self) -> Iterable[HistObject]:
111  return self.iterate(dryrun=False)
112 
113  def warmup(self) -> Iterable[HistObject]:
114  return self.iterate(dryrun=True)
115 
116 
117 class OHOutputModule(OutputModule):
118  def __init__(self):
119  self.target = None
120 
121  def configure(self, options: Mapping[str, Any]) -> None:
122  """
123  Configure this module. Potential elements of "options":
124  target: should be a ROOT-openable filename or URL which
125  can be opened for writing.
126  prefix: directory path to place results under.
127  overwrite: boolean to indicate whether results should overwrite
128  existing histograms in the file.
129  delay: only write histograms in finalize() (not during publish()).
130  """
131  import oh
132  import ispy
133  if 'target' not in options:
134  raise ValueError("Must specify 'target' as an option "
135  "to OHInputModule")
136  self.target = options['target']
137  self.partition, self.server, self.provider = self.target.split(';')
138  self.partition = ispy.IPCPartition(self.partition)
139  if not self.partition.isValid():
140  raise ValueError(f'Output partition {self.partition.name()} is not valid')
141  self.prefix = options.get('prefix', '')
142  self.provider = oh.OHRootProvider(self.partition,
143  self.server, self.provider, None)
144  self.queue = set()
145  log = logging.getLogger(self.__class__.__name__)
146  log.info(f'Using target {self.target}')
147 
148  def publish(self, obj: Union[HistObject, Iterable[HistObject]]) -> None:
149  """ Accepts a HistObject containing a ROOT object to write to file """
150  import os
151  if isinstance(obj, HistObject):
152  obj = [obj]
153 
154  for o in obj:
155  self.provider.publish(o.hist, os.path.join(self.prefix, o.name))
156 
157  def finalize(self) -> None:
158  """ Writes outstanding HistObjects to file """
159  pass
160 
161 
162 if __name__ == '__main__':
163  import sys
164  if len(sys.argv) != 3:
165  print("Provide two arguments (input and output files)")
166  sys.exit(1)
168  im.configure({'source': sys.argv[1]})
170  om.configure({'target': sys.argv[2]})
171  for o in im:
172  print(o, o.hist.GetMean())
173  om.publish(o)
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
python.atlas_oh.OHInputModule.provider
provider
Definition: atlas_oh.py:49
python.atlas_oh.OHInputModule.configure
None configure(self, Mapping[str, Any] options)
Definition: atlas_oh.py:38
python.atlas_oh.OHInputModule.__iter__
Iterable[HistObject] __iter__(self)
Definition: atlas_oh.py:110
python.atlas_oh.Receiver.prefix
prefix
Definition: atlas_oh.py:23
python.atlas_oh.OHInputModule.interval
interval
Definition: atlas_oh.py:55
python.atlas_oh.OHInputModule.selectors
selectors
Definition: atlas_oh.py:36
isValid
bool isValid(const T &p)
Definition: AtlasPID.h:225
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.atlas_oh.OHInputModule.classwarnings
classwarnings
Definition: atlas_oh.py:35
python.atlas_oh.OHOutputModule.finalize
None finalize(self)
Definition: atlas_oh.py:157
python.atlas_oh.OHOutputModule.publish
None publish(self, Union[HistObject, Iterable[HistObject]] obj)
Definition: atlas_oh.py:148
python.atlas_oh.OHInputModule.prefix
prefix
Definition: atlas_oh.py:50
python.atlas_oh.OHInputModule.warmup
Iterable[HistObject] warmup(self)
Definition: atlas_oh.py:113
python.atlas_oh.Receiver.__init__
def __init__(self, q, prefix)
Definition: atlas_oh.py:20
python.atlas_oh.OHOutputModule.queue
queue
Definition: atlas_oh.py:144
python.atlas_oh.OHOutputModule.target
target
Definition: atlas_oh.py:119
python.atlas_oh.OHOutputModule.prefix
prefix
Definition: atlas_oh.py:141
python.atlas_oh.readobj
def readobj(part, server, provider, k, dryrun)
Definition: atlas_oh.py:11
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.atlas_oh.OHInputModule.source
source
Definition: atlas_oh.py:34
python.atlas_oh.OHInputModule
Definition: atlas_oh.py:32
python.atlas_oh.Receiver
Definition: atlas_oh.py:19
python.atlas_oh.OHOutputModule.provider
provider
Definition: atlas_oh.py:137
python.atlas_oh.OHOutputModule
Definition: atlas_oh.py:117
python.atlas_oh.OHInputModule.setSelectors
None setSelectors(self, Collection[Pattern] selectors)
Definition: atlas_oh.py:58
python.atlas_oh.OHOutputModule.partition
partition
Definition: atlas_oh.py:138
python.atlas_oh.Receiver.receive
def receive(self, obj)
Definition: atlas_oh.py:25
python.atlas_oh.OHInputModule.__init__
def __init__(self)
Definition: atlas_oh.py:33
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.atlas_oh.OHOutputModule.__init__
def __init__(self)
Definition: atlas_oh.py:118
python.atlas_oh.Receiver.queue
queue
Definition: atlas_oh.py:22
python.atlas_oh.OHInputModule.iterate
Generator[HistObject, None, None] iterate(self, dryrun)
Definition: atlas_oh.py:62
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.atlas_oh.OHOutputModule.configure
None configure(self, Mapping[str, Any] options)
Definition: atlas_oh.py:121