ATLAS Offline Software
Loading...
Searching...
No Matches
buffers.py
Go to the documentation of this file.
1# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
2#
3# @author Giordon Stark
4
5# Buffer extraction and reconstruction utilities for PythonToolHandle.
6# Generalizes the pattern from test/muon_eff_sf_example_PHYSLITE.py into
7# reusable functions that work with any columnar CP tool.
8
9import itertools
10
11import awkward as ak
12import numpy as np
13
14from ColumnarToolWrapperPython.python_tool_handle import ColumnAccessMode
15
16
17def classify_columns(columns):
18 """Group ColumnInfo objects by container and role.
19
20 Parameters
21 ----------
22 columns:
23 Iterable of ColumnInfo objects as returned by PythonToolHandle.columns.
24
25 Returns
26 -------
27 dict
28 Keyed by container offset name (e.g. "EventInfo", "Muons"). Each value
29 is a dict with:
30
31 - ``"offset"``: the ColumnInfo for this container's offset column
32 - ``"inputs"``: list of input ColumnInfo belonging to this container
33 - ``"outputs"``: list of output ColumnInfo belonging to this container
34 - ``"nested_offsets"``: dict of name -> ColumnInfo for offset columns
35 that are children of this container (e.g. "Muons.NumTrkPt500.offset")
36
37 Notes
38 -----
39 Container offsets have ``is_offset=True`` and an ``offset_name`` of either
40 ``''`` (root, e.g. "EventInfo") or the name of another container offset
41 (e.g. "Muons" has ``offset_name="EventInfo"``). Nested-vector offsets also
42 have ``is_offset=True`` but their name contains a dot; they are stored under
43 ``"nested_offsets"`` of their parent container rather than as top-level keys.
44 """
45 # Separate offset columns from data columns
46 offset_cols = {col.name: col for col in columns if col.is_offset}
47 data_cols = [col for col in columns if not col.is_offset]
48
49 # Determine which offset columns are "container" offsets vs nested-vector
50 # offsets. A container offset is one whose offset_name is either '' (root)
51 # or points to another container offset. For MuonEffSF: EventInfo
52 # (offset_name='') and Muons (offset_name='EventInfo') are both containers.
53 # A nested-vector offset would be something like
54 # "Muons.NumTrkPt500.offset" (offset_name='Muons'), which belongs under
55 # the "Muons" container entry.
56 #
57 # Detection: a nested-vector offset has offset_name pointing to a
58 # container offset AND its name is not a plain container name (contains '.').
59 container_offsets = {}
60 nested_offsets_by_container = {}
61
62 for name, col in offset_cols.items():
63 parent = col.offset_name
64 if parent == "" or parent in offset_cols:
65 # This could be a container or a nested-vector offset. Distinguish
66 # by checking whether the name contains a dot (nested) or not.
67 if "." in name:
68 # Nested-vector offset — goes under its parent container
69 container = parent
70 nested_offsets_by_container.setdefault(container, {})[name] = col
71 else:
72 container_offsets[name] = col
73 else:
74 # offset_name not found in any offset column — treat as root
75 container_offsets[name] = col
76
77 # Build the classified dict
78 classified = {
79 name: {
80 "offset": col,
81 "inputs": [],
82 "outputs": [],
83 "nested_offsets": nested_offsets_by_container.get(name, {}),
84 }
85 for name, col in container_offsets.items()
86 }
87
88 # Assign data columns to their container
89 for col in data_cols:
90 container = col.offset_name
91 if container not in classified:
92 # Shouldn't happen with well-formed tool output
93 continue
94 if col.access_mode == ColumnAccessMode.output:
95 classified[container]["outputs"].append(col)
96 else:
97 classified[container]["inputs"].append(col)
98
99 return classified
100
101
102def resolve_optional_columns(classified, events):
103 """Remove optional input columns absent from events.
104
105 Checks each optional input column against ``ak.fields(events)`` and drops
106 it if not present. Returns a new dict (shallow copy per container); the
107 original classified dict is not modified.
108
109 Parameters
110 ----------
111 classified:
112 Output of ``classify_columns``.
113 events:
114 An ak.Array whose fields are checked for optional column presence.
115
116 Returns
117 -------
118 dict
119 Same structure as ``classify_columns`` output, with absent optional
120 columns removed from each container's ``"inputs"`` list.
121 """
122 available = set(ak.fields(events))
123 result = {}
124 for container, info in classified.items():
125 result[container] = dict(info)
126 result[container]["inputs"] = [
127 col
128 for col in info["inputs"]
129 if not col.is_optional or col.name in available
130 ]
131 return result
132
133
134def extract_buffers(events, classified):
135 """Extract flat numpy buffers from an awkward array.
136
137 Returns a dict mapping column name -> numpy array, covering all container
138 offsets, nested-vector offsets, and input data columns. Output column
139 buffers are not included (allocate_outputs handles those).
140
141 Parameters
142 ----------
143 events:
144 An ak.Array (real or zero-length after typetracer conversion).
145 classified:
146 Output of classify_columns or resolve_optional_columns.
147 """
148 buffers = {}
149 num_events = int(ak.num(events, axis=0))
150
151 for container_name, info in classified.items():
152 input_cols = info["inputs"]
153
154 if not input_cols:
155 # No inputs — synthesize an offset if outputs need it later
156 buffers[container_name] = np.array(
157 [0, num_events], dtype=np.uint64
158 )
159 continue
160
161 # Group input columns by their offset_name, then zip + to_buffers
162 # each group. This is the same pattern as the original example script.
163 sorted_cols = sorted(input_cols, key=lambda c: c.offset_name)
164
165 for offset_name, cols_iter in itertools.groupby(
166 sorted_cols, key=lambda c: c.offset_name
167 ):
168 cols = list(cols_iter)
169 unzipped = {col.name: events[col.name] for col in cols}
170 zipped = ak.zip(unzipped)
171
172 # NB: form_key not crucial, but helpful for debugging
173 form, length, raw_buffers = ak.to_buffers(
174 zipped, form_key=f"{offset_name}{{id}}"
175 )
176
177 if isinstance(form, ak.forms.RecordForm):
178 # EventInfo-like: one record per event.
179 # Use ak.to_numpy per field instead of walking raw_buffers:
180 # when events is masked/indexed, form.content(field) is an
181 # IndexedForm and the "-data" key doesn't exist in raw_buffers.
182 buffers[container_name] = np.array(
183 [0, length], dtype=np.uint64
184 )
185 for col in cols:
186 buffers[col.name] = ak.to_numpy(events[col.name])
187 elif isinstance(form, ak.forms.ListOffsetForm):
188 # Particle container: extract offsets, cast to uint64
189 # for the C++ side
190 offset_key = next(
191 key for key in raw_buffers if key.endswith("-offsets")
192 )
193 buffers[container_name] = np.asarray(
194 raw_buffers[offset_key]
195 ).astype(np.uint64)
196
197 # Data buffers from the inner RecordForm
198 inner = form.content
199 for field in inner.fields:
200 buffers[field] = np.asarray(
201 raw_buffers[f"{inner.content(field).form_key}-data"]
202 )
203 else:
204 raise RuntimeError(
205 f"Cannot handle form {type(form)} for "
206 f"container {container_name}"
207 )
208
209 # TODO: nested vector offset extraction not yet implemented
210
211 return buffers
212
213
214def allocate_outputs(classified, buffer_dict):
215 """Allocate zero-filled numpy arrays for each output column.
216
217 Sizes each output array using ``offsets[-1]`` of the referenced offset
218 buffer. Arrays are added into ``buffer_dict`` in-place and also returned.
219
220 Parameters
221 ----------
222 classified:
223 Output of ``classify_columns`` or ``resolve_optional_columns``.
224 buffer_dict:
225 Dict of column name -> numpy array, as returned by ``extract_buffers``.
226 Modified in-place to include the newly allocated output arrays.
227
228 Returns
229 -------
230 dict
231 Mapping of output column name -> zero-filled numpy array (same objects
232 also inserted into ``buffer_dict``).
233 """
234 output_buffers = {}
235 for _container_name, info in classified.items():
236 for col in info["outputs"]:
237 offset_data = buffer_dict.get(col.offset_name)
238 if offset_data is None:
239 msg = (
240 f"Cannot find offset buffer '{col.offset_name}' "
241 f"needed for output column '{col.name}'"
242 )
243 raise RuntimeError(msg)
244 size = int(offset_data[-1])
245 arr = np.zeros(size, dtype=col.dtype)
246 output_buffers[col.name] = arr
247 buffer_dict[col.name] = arr
248 return output_buffers
249
250
251def reconstruct_output(classified, buffer_dict, num_events):
252 """Build an awkward array from output column buffers.
253
254 Parameters
255 ----------
256 classified:
257 Output of ``classify_columns`` or ``resolve_optional_columns``.
258 buffer_dict:
259 Dict of column name -> numpy array, containing both offset buffers and
260 the output arrays populated by ``allocate_outputs`` and ``call()``.
261 num_events:
262 Number of events (outer axis length of the returned array).
263
264 Returns
265 -------
266 ak.Array
267 Record array with one field per output column, each a variable-length
268 list of per-particle values (i.e. ``var * dtype``).
269 """
270 form_fields = []
271 form_contents = []
272 out_buffers = {}
273
274 # node0 = RecordArray; each output column needs a pair of nodes
275 node_index = 1
276 for _container_name, info in classified.items():
277 for col in info["outputs"]:
278 node_offset = f"node{2 * node_index}"
279 node_data = f"node{2 * node_index + 1}"
280 node_index += 1
281
282 form_fields.append(col.name)
283 form_contents.append(
284 {
285 "class": "ListOffsetArray",
286 "offsets": "i64",
287 "content": {
288 "class": "NumpyArray",
289 "primitive": col.dtype,
290 "form_key": node_data,
291 },
292 "form_key": node_offset,
293 }
294 )
295
296 out_buffers[f"{node_data}-data"] = buffer_dict[col.name]
297 out_buffers[f"{node_offset}-offsets"] = buffer_dict[col.offset_name]
298
299 form = {
300 "class": "RecordArray",
301 "fields": form_fields,
302 "contents": form_contents,
303 "form_key": "node0",
304 }
305
306 return ak.from_buffers(form, num_events, out_buffers)
STL class.
reconstruct_output(classified, buffer_dict, num_events)
Definition buffers.py:251
resolve_optional_columns(classified, events)
Definition buffers.py:102
extract_buffers(events, classified)
Definition buffers.py:134
allocate_outputs(classified, buffer_dict)
Definition buffers.py:214
classify_columns(columns)
Definition buffers.py:17