3 from histgrinder.interfaces
import InputModule, OutputModule
4 from histgrinder.HistObject
import HistObject
5 from typing
import (Union, Iterable, Mapping, Any, Collection,
11 def readobj(part, server, provider, k, dryrun):
14 obj = oh.getRootObject(part, server, provider, k)
29 except Exception
as e:
38 def configure(self, options: Mapping[str, Any]) ->
None:
40 Configure this module. Potential elements of "options":
41 source: a string in the format partition/server
42 prefix: string to prepend to histogram names.
44 from ispy
import IPCPartition
45 if 'source' not in options:
46 raise ValueError(
"Must specify 'source' as an "
47 "option to OHInputModule")
48 self.
source = options[
'source']
50 self.
prefix = options.get(
'prefix',
'')
51 if not IPCPartition(self.partition).
isValid():
52 raise ValueError(f
'Input partition {self.partition} does not exist')
53 log = logging.getLogger(self.__class__.__name__)
54 log.info(f
'Using source {self.source}')
56 log.info(f
'Check interval is {self.interval} seconds')
62 def iterate(self, dryrun) -> Generator[HistObject, None, None]:
63 """ Connect to OH; iterate over matching histograms """
65 ROOT.TH1.AddDirectory(ROOT.kFALSE)
66 from oh
import OHSubscriber
67 from ispy
import IPCPartition, InfoReader
69 from collections
import deque
70 log = logging.getLogger(self.__class__.__name__)
71 opartition = IPCPartition(self.partition)
76 rec =
Receiver(queue, f
'{self.server}.{self.provider}.{self.prefix}')
77 subs = OHSubscriber(opartition, self.server, rec,
True)
78 subs.subscribe_re2(self.server, self.
provider,
'.*',
True)
80 reader = InfoReader(opartition, self.server, rf
'{self.provider}\..*')
82 log.debug(
'First pass update')
83 for k
in reader.objects:
84 toread = k.replace(f
'{self.server}.{self.provider}.',
'')
86 log.debug(
'OH input read '
87 f
'{k} as {toread.replace(self.prefix, "")}')
88 yield HistObject(toread.replace(self.
prefix,
'', 1), obj)
91 log.info(
'First pass complete, entering update loop')
98 log.info(
'Update detected')
100 yield queue.popleft()
103 log.debug(
'Completed update')
105 subs.unsubscribe_re2(self.server, self.
provider,
'.*')
111 return self.
iterate(dryrun=
False)
113 def warmup(self) -> Iterable[HistObject]:
114 return self.
iterate(dryrun=
True)
121 def configure(self, options: Mapping[str, Any]) ->
None:
123 Configure this module. Potential elements of "options":
124 target: should be a ROOT-openable filename or URL which
125 can be opened for writing.
126 prefix: directory path to place results under.
127 overwrite: boolean to indicate whether results should overwrite
128 existing histograms in the file.
129 delay: only write histograms in finalize() (not during publish()).
133 if 'target' not in options:
134 raise ValueError(
"Must specify 'target' as an option "
136 self.
target = options[
'target']
140 raise ValueError(f
'Output partition {self.partition.name()} is not valid')
145 log = logging.getLogger(self.__class__.__name__)
146 log.info(f
'Using target {self.target}')
148 def publish(self, obj: Union[HistObject, Iterable[HistObject]]) ->
None:
149 """ Accepts a HistObject containing a ROOT object to write to file """
151 if isinstance(obj, HistObject):
158 """ Writes outstanding HistObjects to file """
162 if __name__ ==
'__main__':
164 if len(sys.argv) != 3:
165 print(
"Provide two arguments (input and output files)")
168 im.configure({
'source': sys.argv[1]})
170 om.configure({
'target': sys.argv[2]})
172 print(o, o.hist.GetMean())