ATLAS Offline Software
StepOutput.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3 #
4 
5 """Helper class to define and merge step outputs"""
6 
7 from __future__ import annotations
8 
9 from collections.abc import Mapping
10 from typing import Any, Iterable, Iterator, Optional
11 import logging
12 
13 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
14 from AthenaConfiguration.AccumulatorCache import AccumulatorCachable
15 
16 log = logging.getLogger(__name__)
17 
18 
19 class StepOutput(AccumulatorCachable, Mapping[str, Any]):
20  """Helper class to define and merge step outputs
21 
22  The reconstruction sequences for the trigger are broken down into steps. This class
23  holds:
24  - A list of ComponentAccumulators (one per active step)
25  - A dictionary containing nicknames for the outputs of the current steps
26 
27  Dictionary access on this object accesses output names.
28 
29  Note that the step indices here are the steps specific to the jet/MET sequences,
30  not the full number of steps. At present there are three of these:
31  0: Calo-only reconstruction
32  1: Jet-RoI tracking (MET does not participate)
33  2: Full-scan tracking
34 
35  However, this particular class does not make any requirements on what these steps
36  are or how many there are.
37  """
38 
39  def __init__(
40  self, steps: Iterable[ComponentAccumulator]=(), outputs: dict[str, Any]={}
41  ) -> None:
42  self._steps = list(steps)
43  self._outputs = dict(outputs)
44 
45  @classmethod
46  def create(
47  cls,
48  ca: ComponentAccumulator,
49  inputs: Optional[StepOutput] = None,
50  step_idx: Optional[int] = None,
51  **outputs,
52  ) -> StepOutput:
53  """Helper method to create a step output from a single CA and its inputs
54 
55  Parameters
56  ----------
57  ca : ComponentAccumulator
58  The main component accumulator
59  inputs : Optional[StepOutput]
60  The inputs (if any) to the the main CA
61  step_idx : Optional[int]
62  Allow specifying a step. If not provided will be the max step out of the
63  inputs or 0 if there are none
64  outputs : Any
65  Any named outputs from this step
66  """
67  if step_idx is None:
68  step_idx = inputs.max_step_idx if inputs else 0
69  steps = [ComponentAccumulator() for _ in range(step_idx + 1)]
70  steps[step_idx].merge(ca)
71  output = StepOutput(steps, outputs)
72  if inputs:
73  output.merge_other(inputs)
74  return output
75 
76  @classmethod
77  def merge(cls, *inputs: StepOutput) -> StepOutput:
78  """Form a new output by merging multiple others"""
79  output = StepOutput()
80  for i in inputs:
81  output.merge_other(i)
82  return output
83 
84  @property
85  def max_step_idx(self) -> int:
86  """Get the maximum step index in this output"""
87  return len(self._steps) - 1
88 
89  @property
90  def steps(self) -> tuple[ComponentAccumulator, ...]:
91  """The step-separated ComponentAccumulators"""
92  return tuple(self._steps)
93 
94  def add_output_prefix(self, prefix: str) -> None:
95  """Add a prefix to all of our outputs to prevent conflicts"""
96  self._outputs = {prefix + k: v for k, v in self.items()}
97 
99  self, other: StepOutput, rename_outputs: dict[str, str] = {}
100  ) -> None:
101  """Merge another StepOutput into this
102 
103  Parameters
104  ----------
105  other : StepOutput
106  The other output to merge
107  rename_outputs : dict[str, str]
108  Optionally rename some output names so there are no clashes
109  """
110  for step_idx, ca in enumerate(other.steps):
111  self.merge_ca(ca, step_idx)
112  self.__merge_outputs(
113  {rename_outputs.get(k, k): v for k, v in other._outputs.items()}
114  )
115 
116  def merge_ca(self, ca: ComponentAccumulator, step_idx: int, /, **outputs: Any) -> None:
117  """Merge a single CA into this
118 
119  Parameters
120  ----------
121  ca : ComponentAccumulator
122  The component accumulator to merge
123  step_idx : int
124  The step index in which to merge it
125  **outputs : Any
126  Any named outputs from the CA
127  """
128  if step_idx >= self.max_step_idx:
129  self._steps += [ComponentAccumulator() for _ in range(step_idx - self.max_step_idx)]
130  self._steps[step_idx].merge(ca)
131  self.__merge_outputs(outputs)
132 
133  def __merge_outputs(self, outputs: dict[str, Any]) -> None:
134  overlap = {
135  key
136  for key in set(self._outputs) & set(outputs)
137  if self[key] != outputs[key]
138  }
139  if overlap:
140  log.error("Incompatible definitions for named outputs: ")
141  for key in overlap:
142  log.error(f"\t{key}: {self[key]} -> {outputs[key]}")
143  raise ValueError(str(overlap))
144  self._outputs.update(outputs)
145 
146  def __getitem__(self, key: str) -> Any:
147  return self._outputs[key]
148 
149  def __len__(self) -> int:
150  return len(self._outputs)
151 
152  def __iter__(self) -> Iterator[str]:
153  return iter(self._outputs)
154 
155  def _cacheEvict(self):
156  for ca in self.steps:
157  ca._cacheEvict()
python.HLT.MET.StepOutput.StepOutput.max_step_idx
int max_step_idx(self)
Definition: StepOutput.py:85
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.HLT.MET.StepOutput.StepOutput
Definition: StepOutput.py:19
python.HLT.MET.StepOutput.StepOutput._outputs
_outputs
Definition: StepOutput.py:41
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.HLT.MET.StepOutput.StepOutput.__iter__
Iterator[str] __iter__(self)
Definition: StepOutput.py:152
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.HLT.MET.StepOutput.StepOutput.merge
StepOutput merge(cls, *StepOutput inputs)
Definition: StepOutput.py:77
python.HLT.MET.StepOutput.StepOutput.__init__
None __init__(self, Iterable[ComponentAccumulator] steps=(), dict[str, Any] outputs={})
Definition: StepOutput.py:39
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.HLT.MET.StepOutput.StepOutput.__merge_outputs
None __merge_outputs(self, dict[str, Any] outputs)
Definition: StepOutput.py:133
python.HLT.MET.StepOutput.StepOutput._cacheEvict
def _cacheEvict(self)
Definition: StepOutput.py:155
python.HLT.MET.StepOutput.StepOutput.__len__
int __len__(self)
Definition: StepOutput.py:149
python.HLT.MET.StepOutput.StepOutput.steps
tuple[ComponentAccumulator,...] steps(self)
Definition: StepOutput.py:90
python.HLT.MET.StepOutput.StepOutput.merge_other
None merge_other(self, StepOutput other, dict[str, str] rename_outputs={})
Definition: StepOutput.py:98
python.HLT.MET.StepOutput.StepOutput.add_output_prefix
None add_output_prefix(self, str prefix)
Definition: StepOutput.py:94
python.HLT.MET.StepOutput.StepOutput.merge_ca
None merge_ca(self, ComponentAccumulator ca, int step_idx, **Any outputs)
Definition: StepOutput.py:116
python.HLT.MET.StepOutput.StepOutput.__getitem__
Any __getitem__(self, str key)
Definition: StepOutput.py:146
str
Definition: BTagTrackIpAccessor.cxx:11
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
python.HLT.MET.StepOutput.StepOutput._steps
_steps
Definition: StepOutput.py:40
merge
Definition: merge.py:1
python.HLT.MET.StepOutput.StepOutput.create
StepOutput create(cls, ComponentAccumulator ca, Optional[StepOutput] inputs=None, Optional[int] step_idx=None, **outputs)
Definition: StepOutput.py:46