ATLAS Offline Software
Loading...
Searching...
No Matches
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3"""Set up to read and/or write bytestream files.
4
5This module configures the Athena components required to read from
6RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8the functions return into your job configuration as needed.
9
10Executing this module will run a short test over 10 events. The events are read
11from the default bytestream input on CVMFS and written to a local bytestream
12file.
13
14 Typical usage examples:
15
16 component_accumulator.merge(byteStreamReadCfg(flags))
17"""
18from AthenaConfiguration.ComponentFactory import CompFactory
19from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20from AthenaConfiguration.Enums import ProductionStep
21from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
22from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
23from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
24
25
26def ByteStreamReadCfg(flags, type_names=None):
27 """Set up to read from a bytestream file
28
29 The function adds the components required to read events and metadata from
30 bytestream input. May be used to read events from a secondary input as well
31 primary input file.
32
33 Args:
34 flags: Job configuration flags
35 type_names: (optional) specific type names for address provider to find
36
37 Returns:
38 A component accumulator fragment containing the components required to
39 read from bytestream. Should be merged into main job configuration.
40 """
41 result = ComponentAccumulator()
42
43 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
44 result.addService(bytestream_conversion)
45
46 eiName = "EventInfo"
47 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
48 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
49 else:
50 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Common.ProductionStep is ProductionStep.MinbiasPreprocessing else "")
51 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
52 name="ByteStreamInputSvc",
53 EventInfoKey=eiName)
54 result.addService(bytestream_input)
55
56 if flags.Input.SecondaryFiles:
57 event_selector = CompFactory.EventSelectorByteStream(
58 name="SecondaryEventSelector",
59 IsSecondary=True,
60 Input=flags.Input.SecondaryFiles,
61 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
62 ByteStreamInputSvc=bytestream_input.name,
63 )
64 result.addService(event_selector)
65 else:
66 event_selector = CompFactory.EventSelectorByteStream(
67 name="EventSelector",
68 Input=flags.Input.Files,
69 SkipEvents=flags.Exec.SkipEvents,
70 ByteStreamInputSvc=bytestream_input.name,
71 )
72 result.addService(event_selector)
73 result.setAppProperty("EvtSel", event_selector.name)
74
75 event_persistency = CompFactory.EvtPersistencySvc(
76 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
77 )
78 result.addService(event_persistency)
79
80 result.addService(CompFactory.ROBDataProviderSvc())
81
82 address_provider = CompFactory.ByteStreamAddressProviderSvc(
83 TypeNames=type_names if type_names else list(),
84 )
85 result.addService(address_provider)
86
87 result.merge(
88 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
89 )
90
91 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
92 result.addService(proxy)
93
94 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
95 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
96 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
97
98 return result
99
100
101def ByteStreamWriteCfg(flags, type_names=None):
102 """Set up output stream in RAW/bytestream format
103
104 Configure components responsible for writing bytestream format. Write job
105 results to bytestream file. ATLAS file naming conventions are enforced as
106 determined from the given configuration flags.
107
108 Args:
109 flags: Job configuration flags
110 type_names: (optional) Specify item list for output stream to write
111
112 Returns:
113 A component accumulator fragment containing the components required to
114 write to bytestream. Should be merged into main job configuration.
115 """
116 all_runs = set(flags.Input.RunNumbers)
117 assert (
118 len(all_runs) == 1
119 ), "Input is from multiple runs, do not know which one to use {}".format(
120 all_runs
121 )
122 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
123
124 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
125 MaxFileMB=15000,
126 MaxFileNE=15000000, # event (beyond which it creates a new file)
127 OutputDirectory="./",
128 SimpleFileName=flags.Output.BSFileName,
129 AppName="Athena",
130 RunNumber=all_runs.pop(),
131 )
132 result.addService(event_storage_output)
133 # release variable depends the way the env is configured
134 # FileTag = release
135
136 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
137 name="ByteStreamCnvSvc",
138 ByteStreamOutputSvcList=[event_storage_output.getName()],
139 )
140 result.addService(bytestream_conversion)
141
142 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
143 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
144
145 output_stream = CompFactory.AthenaOutputStream(
146 name="BSOutputStreamAlg",
147 EvtConversionSvc=bytestream_conversion.name,
148 OutputFile="ByteStreamEventStorageOutputSvc",
149 ItemList=type_names if type_names else list(),
150 ExtraInputs=[event_info_input]
151 )
152 result.addEventAlgo(output_stream, primary=True)
153
154 result.merge(IOVDbSvcCfg(flags))
155
156 result.merge(
157 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
158 )
159
160 return result
161
162def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
163 """Set up transient ByteStream output stream
164
165 Configure components responsible for writing bytestream format. Write the
166 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
167 The data can then be read downstream as if they were coming from a BS file.
168
169 Args:
170 flags: Job configuration flags
171 item_list: (optional) List of objects to be written to transient ByteStream
172 type_names: (optional) List of types/names to register in BS conversion service
173 as available to be read from (transient) ByteStream
174 extra_inputs: (optional) List of objects which need to be produced before transient
175 ByteStream streaming is scheduled - ensures correct scheduling
176
177 Returns:
178 A component accumulator fragment containing the components required to
179 write transient bytestream. Should be merged into main job configuration.
180 """
181
182 result = ComponentAccumulator()
183
184 result.addService(CompFactory.ROBDataProviderSvc())
185
186 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
187 result.addService(rdp_output)
188
189 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
190 name="ByteStreamCnvSvc",
191 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
192 ByteStreamOutputSvcList=[rdp_output.getName()],
193 )
194 result.addService(bytestream_conversion)
195
196 # Special fictitious extra output which can be used to ensure correct
197 # scheduling of transient ByteStream clients
198 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
199
200 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
201 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
202 if not extra_inputs:
203 extra_inputs = [event_info_input]
204 elif event_info_input not in extra_inputs:
205 extra_inputs.append(event_info_input)
206
207 output_stream = CompFactory.AthenaOutputStream(
208 name="TransBSStreamAlg",
209 EvtConversionSvc=bytestream_conversion.name,
210 OutputFile="ByteStreamRDP_OutputSvc",
211 ItemList=item_list if item_list else list(),
212 ExtraInputs=extra_inputs,
213 ExtraOutputs=extra_outputs
214 )
215 result.addEventAlgo(output_stream,
216 primary=True)
217
218 address_provider = CompFactory.ByteStreamAddressProviderSvc(
219 TypeNames=type_names if type_names else list(),
220 )
221 result.addService(address_provider)
222
223 result.addService(CompFactory.ProxyProviderSvc(
224 ProviderNames = [address_provider.name]))
225
226 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
227
228 return result
229
230
231if __name__ == "__main__":
232 """Run a functional test if module is executed"""
233
234 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
235 from AthenaConfiguration.TestDefaults import defaultTestFiles
236 from AthenaConfiguration.AllConfigFlags import initConfigFlags
237 from AthenaCommon.Logging import logging
238
239 log = logging.getLogger('ByteStreamConfig')
240
241 flags = initConfigFlags()
242 flags.Input.Files = defaultTestFiles.RAW_RUN2
243 flags.Output.doWriteBS = True
244 flags.lock()
245
246 read = ByteStreamReadCfg(flags)
247 read.store(open("test.pkl", "wb"))
248 print("All OK")
249
250 write = ByteStreamWriteCfg(flags)
251 write.printConfig()
252 log.info("Write setup OK")
253
254 acc = MainServicesCfg(flags)
255 acc.merge(read)
256 acc.merge(write)
257 acc.printConfig()
258 log.info("Config OK")
259
260 with open('ByteStreamConfig.pkl', 'wb') as pkl:
261 acc.store(pkl)
262
263 import sys
264 sys.exit(acc.run(10).isFailure())
void print(char *figname, TCanvas *c1)
STL class.
TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)