ATLAS Offline Software
Loading...
Searching...
No Matches
atlas_oh.py
Go to the documentation of this file.
1# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2
3from histgrinder.interfaces import InputModule, OutputModule
4from histgrinder.HistObject import HistObject
5from typing import (Union, Iterable, Mapping, Any, Collection,
6 Pattern, Generator)
7import logging
8import oh
9
10
11def readobj(part, server, provider, k, dryrun):
12 import oh
13 if not dryrun:
14 obj = oh.getRootObject(part, server, provider, k)
15 else:
16 obj = None
17 return obj
18
19class Receiver(oh.OHRootReceiver):
20 def __init__(self, q, prefix):
21 super(Receiver, self).__init__()
22 self.queue = q
23 self.prefix = prefix
24
25 def receive(self, obj):
26 name = obj.GetName().replace(self.prefix, '')
27 try:
28 self.queue.append(HistObject(name, obj))
29 except Exception as e:
30 print(e)
31
32class OHInputModule(InputModule):
33 def __init__(self):
34 self.source = None
36 self.selectors = None
37
38 def configure(self, options: Mapping[str, Any]) -> None:
39 """
40 Configure this module. Potential elements of "options":
41 source: a string in the format partition/server
42 prefix: string to prepend to histogram names.
43 """
44 from ispy import IPCPartition
45 if 'source' not in options:
46 raise ValueError("Must specify 'source' as an "
47 "option to OHInputModule")
48 self.source = options['source']
49 self.partition, self.server, self.provider = self.source.split(';')
50 self.prefix = options.get('prefix', '')
51 if not IPCPartition(self.partition).isValid():
52 raise ValueError(f'Input partition {self.partition} does not exist')
53 log = logging.getLogger(self.__class__.__name__)
54 log.info(f'Using source {self.source}')
55 self.interval = options.get('interval', 5)
56 log.info(f'Check interval is {self.interval} seconds')
57
58 def setSelectors(self, selectors: Collection[Pattern]) -> None:
59 """ Do more later """
60 self.selectors = selectors
61
62 def iterate(self, dryrun) -> Generator[HistObject, None, None]:
63 """ Connect to OH; iterate over matching histograms """
64 import ROOT
65 ROOT.TH1.AddDirectory(ROOT.kFALSE)
66 from oh import OHSubscriber
67 from ispy import IPCPartition, InfoReader
68 import time
69 from collections import deque
70 log = logging.getLogger(self.__class__.__name__)
71 opartition = IPCPartition(self.partition)
72
73 # Set up two reading methods: a subscriber to new histograms and
74 # a first pass over all histograms.
75 queue = deque()
76 rec = Receiver(queue, f'{self.server}.{self.provider}.{self.prefix}')
77 subs = OHSubscriber(opartition, self.server, rec, True)
78 subs.subscribe_re2(self.server, self.provider, '.*', True)
79
80 reader = InfoReader(opartition, self.server, rf'{self.provider}\..*')
81 reader.update()
82 log.debug('First pass update')
83 for k in reader.objects:
84 toread = k.replace(f'{self.server}.{self.provider}.','')
85 obj = readobj(opartition, self.server, self.provider, toread, dryrun)
86 log.debug('OH input read '
87 f'{k} as {toread.replace(self.prefix, "")}')
88 yield HistObject(toread.replace(self.prefix, '', 1), obj)
89
90 if not dryrun:
91 log.info('First pass complete, entering update loop')
92 del reader
93 try:
94 while True:
95 try:
96 time.sleep(self.interval)
97 if len(queue) > 0:
98 log.info('Update detected')
99 while True:
100 yield queue.popleft()
101 except IndexError:
102 pass
103 log.debug('Completed update')
104 finally:
105 subs.unsubscribe_re2(self.server, self.provider, '.*')
106
107 return
108 yield
109
110 def __iter__(self) -> Iterable[HistObject]:
111 return self.iterate(dryrun=False)
112
113 def warmup(self) -> Iterable[HistObject]:
114 return self.iterate(dryrun=True)
115
116
117class OHOutputModule(OutputModule):
118 def __init__(self):
119 self.target = None
120
121 def configure(self, options: Mapping[str, Any]) -> None:
122 """
123 Configure this module. Potential elements of "options":
124 target: should be a ROOT-openable filename or URL which
125 can be opened for writing.
126 prefix: directory path to place results under.
127 overwrite: boolean to indicate whether results should overwrite
128 existing histograms in the file.
129 delay: only write histograms in finalize() (not during publish()).
130 """
131 import oh
132 import ispy
133 if 'target' not in options:
134 raise ValueError("Must specify 'target' as an option "
135 "to OHInputModule")
136 self.target = options['target']
137 self.partition, self.server, self.provider = self.target.split(';')
138 self.partition = ispy.IPCPartition(self.partition)
139 if not self.partition.isValid():
140 raise ValueError(f'Output partition {self.partition.name()} is not valid')
141 self.prefix = options.get('prefix', '')
142 self.provider = oh.OHRootProvider(self.partition,
143 self.server, self.provider, None)
144 self.queue = set()
145 log = logging.getLogger(self.__class__.__name__)
146 log.info(f'Using target {self.target}')
147
148 def publish(self, obj: Union[HistObject, Iterable[HistObject]]) -> None:
149 """ Accepts a HistObject containing a ROOT object to write to file """
150 import os
151 if isinstance(obj, HistObject):
152 obj = [obj]
153
154 for o in obj:
155 self.provider.publish(o.hist, os.path.join(self.prefix, o.name))
156
157 def finalize(self) -> None:
158 """ Writes outstanding HistObjects to file """
159 pass
160
161
162if __name__ == '__main__':
163 import sys
164 if len(sys.argv) != 3:
165 print("Provide two arguments (input and output files)")
166 sys.exit(1)
168 im.configure({'source': sys.argv[1]})
170 om.configure({'target': sys.argv[2]})
171 for o in im:
172 print(o, o.hist.GetMean())
173 om.publish(o)
bool isValid(const T &p)
Av: we implement here an ATLAS-sepcific convention: all particles which are 99xxxxx are fine.
Definition AtlasPID.h:878
void print(char *figname, TCanvas *c1)
Generator[HistObject, None, None] iterate(self, dryrun)
Definition atlas_oh.py:62
None setSelectors(self, Collection[Pattern] selectors)
Definition atlas_oh.py:58
Iterable[HistObject] __iter__(self)
Definition atlas_oh.py:110
None configure(self, Mapping[str, Any] options)
Definition atlas_oh.py:38
Iterable[HistObject] warmup(self)
Definition atlas_oh.py:113
None configure(self, Mapping[str, Any] options)
Definition atlas_oh.py:121
None publish(self, Union[HistObject, Iterable[HistObject]] obj)
Definition atlas_oh.py:148
__init__(self, q, prefix)
Definition atlas_oh.py:20
STL class.
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
readobj(part, server, provider, k, dryrun)
Definition atlas_oh.py:11