ATLAS Offline Software
Loading...
Searching...
No Matches
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3"""Set up to read and/or write bytestream files.
4
5This module configures the Athena components required to read from
6RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8the functions return into your job configuration as needed.
9
10Executing this module will run a short test over 10 events. The events are read
11from the default bytestream input on CVMFS and written to a local bytestream
12file.
13
14 Typical usage examples:
15
16 component_accumulator.merge(byteStreamReadCfg(flags))
17"""
18from AthenaConfiguration.ComponentFactory import CompFactory
19from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20from AthenaConfiguration.Enums import ProductionStep
21from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
22from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
23from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
24
25
26def ByteStreamReadCfg(flags, type_names=None):
27 """Set up to read from a bytestream file
28
29 The function adds the components required to read events and metadata from
30 bytestream input. May be used to read events from a secondary input as well
31 primary input file.
32
33 Args:
34 flags: Job configuration flags
35 type_names: (optional) specific type names for address provider to find
36
37 Returns:
38 A component accumulator fragment containing the components required to
39 read from bytestream. Should be merged into main job configuration.
40 """
41 result = ComponentAccumulator()
42
43 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
44 IsSimulation=flags.Input.isMC,
45 )
46 result.addService(bytestream_conversion)
47
48 # For MC ByteStream, add the MCEventInfoByteStreamTool to decode MC EventInfo
49 if flags.Input.isMC:
50 mcEventInfoTool = CompFactory.MCEventInfoByteStreamTool(
51 name="MCEventInfoByteStreamTool",
52 ROBIDs=[0x00ff0001], # SubDetector=OTHER=0xFF, ModuleId=0x01
53 EventInfoReadKey="", # Empty for decoding mode
54 )
55 result.addPublicTool(mcEventInfoTool)
56
57 eiName = "EventInfo"
58 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
59 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
60 else:
61 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Common.ProductionStep is ProductionStep.MinbiasPreprocessing else "")
62 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
63 name="ByteStreamInputSvc",
64 EventInfoKey=eiName)
65 result.addService(bytestream_input)
66
67 if flags.Input.SecondaryFiles:
68 event_selector = CompFactory.EventSelectorByteStream(
69 name="SecondaryEventSelector",
70 IsSecondary=True,
71 Input=flags.Input.SecondaryFiles,
72 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
73 ByteStreamInputSvc=bytestream_input.name,
74 )
75 result.addService(event_selector)
76 else:
77 event_selector = CompFactory.EventSelectorByteStream(
78 name="EventSelector",
79 Input=flags.Input.Files,
80 SkipEvents=flags.Exec.SkipEvents,
81 ByteStreamInputSvc=bytestream_input.name,
82 )
83 result.addService(event_selector)
84 result.setAppProperty("EvtSel", event_selector.name)
85
86 event_persistency = CompFactory.EvtPersistencySvc(
87 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
88 )
89 result.addService(event_persistency)
90
91 result.addService(CompFactory.ROBDataProviderSvc())
92
93 address_provider = CompFactory.ByteStreamAddressProviderSvc(
94 TypeNames=type_names if type_names else list(),
95 )
96 result.addService(address_provider)
97
98 result.merge(
99 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
100 )
101
102 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
103 result.addService(proxy)
104
105 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
106 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
107 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
108
109 return result
110
111
112def ByteStreamWriteCfg(flags, type_names=None):
113 """Set up output stream in RAW/bytestream format
114
115 Configure components responsible for writing bytestream format. Write job
116 results to bytestream file. ATLAS file naming conventions are enforced as
117 determined from the given configuration flags.
118
119 Args:
120 flags: Job configuration flags
121 type_names: (optional) Specify item list for output stream to write
122
123 Returns:
124 A component accumulator fragment containing the components required to
125 write to bytestream. Should be merged into main job configuration.
126 """
127 all_runs = set(flags.Input.RunNumbers)
128 assert (
129 len(all_runs) == 1
130 ), "Input is from multiple runs, do not know which one to use {}".format(
131 all_runs
132 )
133 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
134
135 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
136 MaxFileMB=15000,
137 MaxFileNE=15000000, # event (beyond which it creates a new file)
138 OutputDirectory="./",
139 SimpleFileName=flags.Output.BSFileName,
140 AppName="Athena",
141 RunNumber=all_runs.pop(),
142 )
143 result.addService(event_storage_output)
144 # release variable depends the way the env is configured
145 # FileTag = release
146
147 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
148 name="ByteStreamCnvSvc",
149 ByteStreamOutputSvcList=[event_storage_output.getName()],
150 IsSimulation=flags.Input.isMC,
151 )
152 result.addService(bytestream_conversion)
153
154 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
155 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
156
157 output_stream = CompFactory.AthenaOutputStream(
158 name="BSOutputStreamAlg",
159 EvtConversionSvc=bytestream_conversion.name,
160 OutputFile="ByteStreamEventStorageOutputSvc",
161 ItemList=type_names if type_names else list(),
162 ExtraInputs=[event_info_input]
163 )
164 result.addEventAlgo(output_stream, primary=True)
165
166 result.merge(IOVDbSvcCfg(flags))
167
168 result.merge(
169 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
170 )
171
172 return result
173
174def MCEventInfoByteStreamToolCfg(flags, name="MCEventInfoByteStreamTool", writeBS=False):
175 """Configure the MCEventInfoByteStreamTool for encoding/decoding MC EventInfo
176
177 This tool serializes MC-specific EventInfo fields (mcChannelNumber, mcEventNumber,
178 mcEventWeights, pileup information, etc.) into a dedicated ROB fragment for MC
179 ByteStream files (RDO->BS workflow).
180
181 Args:
182 flags: Job configuration flags
183 name: Tool name
184 writeBS: If True, configure for encoding, else for decoding
185
186 Returns:
187 A component accumulator fragment containing the configured tool
188 """
189 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
190 from AthenaConfiguration.ComponentFactory import CompFactory
191
192 acc = ComponentAccumulator()
193 tool = CompFactory.MCEventInfoByteStreamTool(name)
194
195 # ROB ID for MC EventInfo (SubDetector=OTHER=0xFF, ModuleId=0x01)
196 mc_eventinfo_robid = 0x00ff0001
197 tool.ROBIDs = [mc_eventinfo_robid]
198
199 if writeBS:
200 # write BS == read xAOD (encoding mode)
201 tool.EventInfoReadKey = "EventInfo"
202 else:
203 # read BS == write xAOD (decoding mode)
204 tool.EventInfoReadKey = ""
205
206 acc.setPrivateTools(tool)
207 return acc
208
209
210def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
211 """Set up transient ByteStream output stream
212
213 Configure components responsible for writing bytestream format. Write the
214 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
215 The data can then be read downstream as if they were coming from a BS file.
216
217 Args:
218 flags: Job configuration flags
219 item_list: (optional) List of objects to be written to transient ByteStream
220 type_names: (optional) List of types/names to register in BS conversion service
221 as available to be read from (transient) ByteStream
222 extra_inputs: (optional) List of objects which need to be produced before transient
223 ByteStream streaming is scheduled - ensures correct scheduling
224
225 Returns:
226 A component accumulator fragment containing the components required to
227 write transient bytestream. Should be merged into main job configuration.
228 """
229
230 result = ComponentAccumulator()
231
232 result.addService(CompFactory.ROBDataProviderSvc())
233
234 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
235 result.addService(rdp_output)
236
237 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
238 name="ByteStreamCnvSvc",
239 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
240 ByteStreamOutputSvcList=[rdp_output.getName()],
241 IsSimulation=flags.Input.isMC,
242 )
243 result.addService(bytestream_conversion)
244
245 # Special fictitious extra output which can be used to ensure correct
246 # scheduling of transient ByteStream clients
247 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
248
249 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
250 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
251 if not extra_inputs:
252 extra_inputs = [event_info_input]
253 elif event_info_input not in extra_inputs:
254 extra_inputs.append(event_info_input)
255
256 output_stream = CompFactory.AthenaOutputStream(
257 name="TransBSStreamAlg",
258 EvtConversionSvc=bytestream_conversion.name,
259 OutputFile="ByteStreamRDP_OutputSvc",
260 ItemList=item_list if item_list else list(),
261 ExtraInputs=extra_inputs,
262 ExtraOutputs=extra_outputs
263 )
264 result.addEventAlgo(output_stream,
265 primary=True)
266
267 address_provider = CompFactory.ByteStreamAddressProviderSvc(
268 TypeNames=type_names if type_names else list(),
269 )
270 result.addService(address_provider)
271
272 result.addService(CompFactory.ProxyProviderSvc(
273 ProviderNames = [address_provider.name]))
274
275 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
276
277 return result
278
279
280if __name__ == "__main__":
281 """Run a functional test if module is executed"""
282
283 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
284 from AthenaConfiguration.TestDefaults import defaultTestFiles
285 from AthenaConfiguration.AllConfigFlags import initConfigFlags
286 from AthenaCommon.Logging import logging
287
288 log = logging.getLogger('ByteStreamConfig')
289
290 flags = initConfigFlags()
291 flags.Input.Files = defaultTestFiles.RAW_RUN2
292 flags.Output.doWriteBS = True
293 flags.lock()
294
295 read = ByteStreamReadCfg(flags)
296 read.store(open("test.pkl", "wb"))
297 print("All OK")
298
299 write = ByteStreamWriteCfg(flags)
300 write.printConfig()
301 log.info("Write setup OK")
302
303 acc = MainServicesCfg(flags)
304 acc.merge(read)
305 acc.merge(write)
306 acc.printConfig()
307 log.info("Config OK")
308
309 with open('ByteStreamConfig.pkl', 'wb') as pkl:
310 acc.store(pkl)
311
312 import sys
313 sys.exit(acc.run(10).isFailure())
void print(char *figname, TCanvas *c1)
STL class.
MCEventInfoByteStreamToolCfg(flags, name="MCEventInfoByteStreamTool", writeBS=False)
TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)