3 """Set up to read and/or write bytestream files.
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
14 Typical usage examples:
16 component_accumulator.merge(byteStreamReadCfg(flags))
18 from AthenaConfiguration.ComponentFactory
import CompFactory
19 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig
import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig
import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
26 """Set up to read from a bytestream file
28 The function adds the components required to read events and metadata from
29 bytestream input. May be used to read events from a secondary input as well
33 flags: Job configuration flags
34 type_names: (optional) specific type names for address provider to find
37 A component accumulator fragment containing the components required to
38 read from bytestream. Should be merged into main job configuration.
42 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43 result.addService(bytestream_conversion)
46 if flags.Common.isOnline
and not any(flags.Input.Files)
and not (flags.Trigger.doHLT
or flags.Trigger.doLVL1):
47 bytestream_input = CompFactory.ByteStreamEmonInputSvc(
"ByteStreamInputSvc")
49 eiName =
"{}EventInfo".
format(flags.Overlay.BkgPrefix
if flags.Overlay.ByteStream
else "")
50 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
51 name=
"ByteStreamInputSvc",
53 result.addService(bytestream_input)
55 if flags.Input.SecondaryFiles:
56 event_selector = CompFactory.EventSelectorByteStream(
57 name=
"SecondaryEventSelector",
59 Input=flags.Input.SecondaryFiles,
60 SkipEvents=flags.Exec.SkipEvents
if flags.Overlay.SkipSecondaryEvents >= 0
else flags.Exec.SkipEvents,
61 ByteStreamInputSvc=bytestream_input.name,
63 result.addService(event_selector)
65 event_selector = CompFactory.EventSelectorByteStream(
67 Input=flags.Input.Files,
68 SkipEvents=flags.Exec.SkipEvents,
69 ByteStreamInputSvc=bytestream_input.name,
71 result.addService(event_selector)
72 result.setAppProperty(
"EvtSel", event_selector.name)
74 event_persistency = CompFactory.EvtPersistencySvc(
75 name=
"EventPersistencySvc", CnvServices=[bytestream_conversion.name]
77 result.addService(event_persistency)
79 result.addService(CompFactory.ROBDataProviderSvc())
81 address_provider = CompFactory.ByteStreamAddressProviderSvc(
82 TypeNames=type_names
if type_names
else list(),
84 result.addService(address_provider)
87 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
90 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
91 result.addService(proxy)
94 ExtraOutputs=[(
"xAOD::EventInfo",f
"StoreGateSvc+{eiName}"),
95 (
"xAOD::EventAuxInfo",f
"StoreGateSvc+{eiName}Aux.")]))
101 """Set up output stream in RAW/bytestream format
103 Configure components responsible for writing bytestream format. Write job
104 results to bytestream file. ATLAS file naming conventions are enforced as
105 determined from the given configuration flags.
108 flags: Job configuration flags
109 type_names: (optional) Specify item list for output stream to write
112 A component accumulator fragment containing the components required to
113 write to bytestream. Should be merged into main job configuration.
115 all_runs =
set(flags.Input.RunNumbers)
118 ),
"Input is from multiple runs, do not know which one to use {}".
format(
123 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
126 OutputDirectory=
"./",
127 SimpleFileName=flags.Output.BSFileName,
129 RunNumber=all_runs.pop(),
131 result.addService(event_storage_output)
135 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
136 name=
"ByteStreamCnvSvc",
137 ByteStreamOutputSvcList=[event_storage_output.getName()],
139 result.addService(bytestream_conversion)
142 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
144 output_stream = CompFactory.AthenaOutputStream(
145 name=
"BSOutputStreamAlg",
146 EvtConversionSvc=bytestream_conversion.name,
147 OutputFile=
"ByteStreamEventStorageOutputSvc",
148 ItemList=type_names
if type_names
else list(),
149 ExtraInputs=[event_info_input]
151 result.addEventAlgo(output_stream, primary=
True)
156 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
162 """Set up transient ByteStream output stream
164 Configure components responsible for writing bytestream format. Write the
165 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
166 The data can then be read downstream as if they were coming from a BS file.
169 flags: Job configuration flags
170 item_list: (optional) List of objects to be written to transient ByteStream
171 type_names: (optional) List of types/names to register in BS conversion service
172 as available to be read from (transient) ByteStream
173 extra_inputs: (optional) List of objects which need to be produced before transient
174 ByteStream streaming is scheduled - ensures correct scheduling
177 A component accumulator fragment containing the components required to
178 write transient bytestream. Should be merged into main job configuration.
183 result.addService(CompFactory.ROBDataProviderSvc())
185 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
186 result.addService(rdp_output)
188 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
189 name=
"ByteStreamCnvSvc",
190 FillTriggerBits=
False,
191 ByteStreamOutputSvcList=[rdp_output.getName()],
193 result.addService(bytestream_conversion)
197 extra_outputs = [(
"TransientBSOutType",
"StoreGateSvc+TransientBSOutKey")]
200 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
202 extra_inputs = [event_info_input]
203 elif event_info_input
not in extra_inputs:
204 extra_inputs.append(event_info_input)
206 output_stream = CompFactory.AthenaOutputStream(
207 name=
"TransBSStreamAlg",
208 EvtConversionSvc=bytestream_conversion.name,
209 OutputFile=
"ByteStreamRDP_OutputSvc",
210 ItemList=item_list
if item_list
else list(),
211 ExtraInputs=extra_inputs,
212 ExtraOutputs=extra_outputs
214 result.addEventAlgo(output_stream,
217 address_provider = CompFactory.ByteStreamAddressProviderSvc(
218 TypeNames=type_names
if type_names
else list(),
220 result.addService(address_provider)
222 result.addService(CompFactory.ProxyProviderSvc(
223 ProviderNames = [address_provider.name]))
230 if __name__ ==
"__main__":
231 """Run a functional test if module is executed"""
233 from AthenaConfiguration.MainServicesConfig
import MainServicesCfg
234 from AthenaConfiguration.TestDefaults
import defaultTestFiles
235 from AthenaConfiguration.AllConfigFlags
import initConfigFlags
236 from AthenaCommon.Logging
import logging
238 log = logging.getLogger(
'ByteStreamConfig')
241 flags.Input.Files = defaultTestFiles.RAW_RUN2
242 flags.Output.doWriteBS =
True
246 read.store(
open(
"test.pkl",
"wb"))
251 log.info(
"Write setup OK")
257 log.info(
"Config OK")
259 with open(
'ByteStreamConfig.pkl',
'wb')
as pkl:
263 sys.exit(acc.run(10).isFailure())