ATLAS Offline Software
Loading...
Searching...
No Matches
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3"""Set up to read and/or write bytestream files.
4
5This module configures the Athena components required to read from
6RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8the functions return into your job configuration as needed.
9
10Executing this module will run a short test over 10 events. The events are read
11from the default bytestream input on CVMFS and written to a local bytestream
12file.
13
14 Typical usage examples:
15
16 component_accumulator.merge(byteStreamReadCfg(flags))
17"""
18from AthenaConfiguration.ComponentFactory import CompFactory
19from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
21from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
22from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
23
24
25def ByteStreamReadCfg(flags, type_names=None):
26 """Set up to read from a bytestream file
27
28 The function adds the components required to read events and metadata from
29 bytestream input. May be used to read events from a secondary input as well
30 primary input file.
31
32 Args:
33 flags: Job configuration flags
34 type_names: (optional) specific type names for address provider to find
35
36 Returns:
37 A component accumulator fragment containing the components required to
38 read from bytestream. Should be merged into main job configuration.
39 """
40 result = ComponentAccumulator()
41
42 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43 result.addService(bytestream_conversion)
44
45 eiName = "EventInfo"
46 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
47 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
48 else:
49 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Overlay.ByteStream else "")
50 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
51 name="ByteStreamInputSvc",
52 EventInfoKey=eiName)
53 result.addService(bytestream_input)
54
55 if flags.Input.SecondaryFiles:
56 event_selector = CompFactory.EventSelectorByteStream(
57 name="SecondaryEventSelector",
58 IsSecondary=True,
59 Input=flags.Input.SecondaryFiles,
60 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
61 ByteStreamInputSvc=bytestream_input.name,
62 )
63 result.addService(event_selector)
64 else:
65 event_selector = CompFactory.EventSelectorByteStream(
66 name="EventSelector",
67 Input=flags.Input.Files,
68 SkipEvents=flags.Exec.SkipEvents,
69 ByteStreamInputSvc=bytestream_input.name,
70 )
71 result.addService(event_selector)
72 result.setAppProperty("EvtSel", event_selector.name)
73
74 event_persistency = CompFactory.EvtPersistencySvc(
75 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
76 )
77 result.addService(event_persistency)
78
79 result.addService(CompFactory.ROBDataProviderSvc())
80
81 address_provider = CompFactory.ByteStreamAddressProviderSvc(
82 TypeNames=type_names if type_names else list(),
83 )
84 result.addService(address_provider)
85
86 result.merge(
87 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
88 )
89
90 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
91 result.addService(proxy)
92
93 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
94 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
95 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
96
97 return result
98
99
100def ByteStreamWriteCfg(flags, type_names=None):
101 """Set up output stream in RAW/bytestream format
102
103 Configure components responsible for writing bytestream format. Write job
104 results to bytestream file. ATLAS file naming conventions are enforced as
105 determined from the given configuration flags.
106
107 Args:
108 flags: Job configuration flags
109 type_names: (optional) Specify item list for output stream to write
110
111 Returns:
112 A component accumulator fragment containing the components required to
113 write to bytestream. Should be merged into main job configuration.
114 """
115 all_runs = set(flags.Input.RunNumbers)
116 assert (
117 len(all_runs) == 1
118 ), "Input is from multiple runs, do not know which one to use {}".format(
119 all_runs
120 )
121 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
122
123 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
124 MaxFileMB=15000,
125 MaxFileNE=15000000, # event (beyond which it creates a new file)
126 OutputDirectory="./",
127 SimpleFileName=flags.Output.BSFileName,
128 AppName="Athena",
129 RunNumber=all_runs.pop(),
130 )
131 result.addService(event_storage_output)
132 # release variable depends the way the env is configured
133 # FileTag = release
134
135 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
136 name="ByteStreamCnvSvc",
137 ByteStreamOutputSvcList=[event_storage_output.getName()],
138 )
139 result.addService(bytestream_conversion)
140
141 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
142 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
143
144 output_stream = CompFactory.AthenaOutputStream(
145 name="BSOutputStreamAlg",
146 EvtConversionSvc=bytestream_conversion.name,
147 OutputFile="ByteStreamEventStorageOutputSvc",
148 ItemList=type_names if type_names else list(),
149 ExtraInputs=[event_info_input]
150 )
151 result.addEventAlgo(output_stream, primary=True)
152
153 result.merge(IOVDbSvcCfg(flags))
154
155 result.merge(
156 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
157 )
158
159 return result
160
161def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
162 """Set up transient ByteStream output stream
163
164 Configure components responsible for writing bytestream format. Write the
165 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
166 The data can then be read downstream as if they were coming from a BS file.
167
168 Args:
169 flags: Job configuration flags
170 item_list: (optional) List of objects to be written to transient ByteStream
171 type_names: (optional) List of types/names to register in BS conversion service
172 as available to be read from (transient) ByteStream
173 extra_inputs: (optional) List of objects which need to be produced before transient
174 ByteStream streaming is scheduled - ensures correct scheduling
175
176 Returns:
177 A component accumulator fragment containing the components required to
178 write transient bytestream. Should be merged into main job configuration.
179 """
180
181 result = ComponentAccumulator()
182
183 result.addService(CompFactory.ROBDataProviderSvc())
184
185 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
186 result.addService(rdp_output)
187
188 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
189 name="ByteStreamCnvSvc",
190 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
191 ByteStreamOutputSvcList=[rdp_output.getName()],
192 )
193 result.addService(bytestream_conversion)
194
195 # Special fictitious extra output which can be used to ensure correct
196 # scheduling of transient ByteStream clients
197 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
198
199 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
200 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
201 if not extra_inputs:
202 extra_inputs = [event_info_input]
203 elif event_info_input not in extra_inputs:
204 extra_inputs.append(event_info_input)
205
206 output_stream = CompFactory.AthenaOutputStream(
207 name="TransBSStreamAlg",
208 EvtConversionSvc=bytestream_conversion.name,
209 OutputFile="ByteStreamRDP_OutputSvc",
210 ItemList=item_list if item_list else list(),
211 ExtraInputs=extra_inputs,
212 ExtraOutputs=extra_outputs
213 )
214 result.addEventAlgo(output_stream,
215 primary=True)
216
217 address_provider = CompFactory.ByteStreamAddressProviderSvc(
218 TypeNames=type_names if type_names else list(),
219 )
220 result.addService(address_provider)
221
222 result.addService(CompFactory.ProxyProviderSvc(
223 ProviderNames = [address_provider.name]))
224
225 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
226
227 return result
228
229
230if __name__ == "__main__":
231 """Run a functional test if module is executed"""
232
233 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
234 from AthenaConfiguration.TestDefaults import defaultTestFiles
235 from AthenaConfiguration.AllConfigFlags import initConfigFlags
236 from AthenaCommon.Logging import logging
237
238 log = logging.getLogger('ByteStreamConfig')
239
240 flags = initConfigFlags()
241 flags.Input.Files = defaultTestFiles.RAW_RUN2
242 flags.Output.doWriteBS = True
243 flags.lock()
244
245 read = ByteStreamReadCfg(flags)
246 read.store(open("test.pkl", "wb"))
247 print("All OK")
248
249 write = ByteStreamWriteCfg(flags)
250 write.printConfig()
251 log.info("Write setup OK")
252
253 acc = MainServicesCfg(flags)
254 acc.merge(read)
255 acc.merge(write)
256 acc.printConfig()
257 log.info("Config OK")
258
259 with open('ByteStreamConfig.pkl', 'wb') as pkl:
260 acc.store(pkl)
261
262 import sys
263 sys.exit(acc.run(10).isFailure())
void print(char *figname, TCanvas *c1)
STL class.
TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)