3 """Set up to read and/or write bytestream files.
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
14 Typical usage examples:
16 component_accumulator.merge(byteStreamReadCfg(flags))
18 from AthenaConfiguration.ComponentFactory
import CompFactory
19 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig
import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig
import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
26 """Set up to read from a bytestream file
28 The function adds the components required to read events and metadata from
29 bytestream input. May be used to read events from a secondary input as well
33 flags: Job configuration flags
34 type_names: (optional) specific type names for address provider to find
37 A component accumulator fragment containing the components required to
38 read from bytestream. Should be merged into main job configuration.
42 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43 result.addService(bytestream_conversion)
45 if flags.Common.isOnline
and not any(flags.Input.Files)
and not (flags.Trigger.doHLT
or flags.Trigger.doLVL1):
46 bytestream_input = CompFactory.ByteStreamEmonInputSvc(
"ByteStreamInputSvc")
48 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
49 name=
"ByteStreamInputSvc",
50 EventInfoKey=
"{}EventInfo".
format(
51 flags.Overlay.BkgPrefix
if flags.Overlay.DataOverlay
else ""
54 result.addService(bytestream_input)
56 if flags.Input.SecondaryFiles:
57 event_selector = CompFactory.EventSelectorByteStream(
58 name=
"SecondaryEventSelector",
60 Input=flags.Input.SecondaryFiles,
61 SkipEvents=flags.Exec.SkipEvents
if flags.Overlay.SkipSecondaryEvents >= 0
else flags.Exec.SkipEvents,
62 ByteStreamInputSvc=bytestream_input.name,
64 result.addService(event_selector)
66 event_selector = CompFactory.EventSelectorByteStream(
68 Input=flags.Input.Files,
69 SkipEvents=flags.Exec.SkipEvents,
70 ByteStreamInputSvc=bytestream_input.name,
71 HelperTools = [CompFactory.xAODMaker.EventInfoSelectorTool()]
73 result.addService(event_selector)
74 result.setAppProperty(
"EvtSel", event_selector.name)
76 event_persistency = CompFactory.EvtPersistencySvc(
77 name=
"EventPersistencySvc", CnvServices=[bytestream_conversion.name]
79 result.addService(event_persistency)
81 result.addService(CompFactory.ROBDataProviderSvc())
83 address_provider = CompFactory.ByteStreamAddressProviderSvc(
84 TypeNames=type_names
if type_names
else list(),
86 result.addService(address_provider)
89 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
92 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
93 result.addService(proxy)
101 """Set up output stream in RAW/bytestream format
103 Configure components responsible for writing bytestream format. Write job
104 results to bytestream file. ATLAS file naming conventions are enforced as
105 determined from the given configuration flags.
108 flags: Job configuration flags
109 type_names: (optional) Specify item list for output stream to write
112 A component accumulator fragment containing the components required to
113 write to bytestream. Should be merged into main job configuration.
115 all_runs =
set(flags.Input.RunNumbers)
118 ),
"Input is from multiple runs, do not know which one to use {}".
format(
123 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
126 OutputDirectory=
"./",
128 RunNumber=all_runs.pop(),
130 result.addService(event_storage_output)
134 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
135 name=
"ByteStreamCnvSvc",
136 ByteStreamOutputSvcList=[event_storage_output.getName()],
138 result.addService(bytestream_conversion)
141 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
143 output_stream = CompFactory.AthenaOutputStream(
144 name=
"BSOutputStreamAlg",
145 EvtConversionSvc=bytestream_conversion.name,
146 OutputFile=
"ByteStreamEventStorageOutputSvc",
147 ItemList=type_names
if type_names
else list(),
148 ExtraInputs=[event_info_input]
150 result.addEventAlgo(output_stream, primary=
True)
155 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
161 """Set up transient ByteStream output stream
163 Configure components responsible for writing bytestream format. Write the
164 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
165 The data can then be read downstream as if they were coming from a BS file.
168 flags: Job configuration flags
169 item_list: (optional) List of objects to be written to transient ByteStream
170 type_names: (optional) List of types/names to register in BS conversion service
171 as available to be read from (transient) ByteStream
172 extra_inputs: (optional) List of objects which need to be produced before transient
173 ByteStream streaming is scheduled - ensures correct scheduling
176 A component accumulator fragment containing the components required to
177 write transient bytestream. Should be merged into main job configuration.
182 result.addService(CompFactory.ROBDataProviderSvc())
184 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
185 result.addService(rdp_output)
187 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
188 name=
"ByteStreamCnvSvc",
189 FillTriggerBits=
False,
190 ByteStreamOutputSvcList=[rdp_output.getName()],
192 result.addService(bytestream_conversion)
196 extra_outputs = [(
"TransientBSOutType",
"StoreGateSvc+TransientBSOutKey")]
199 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
201 extra_inputs = [event_info_input]
202 elif event_info_input
not in extra_inputs:
203 extra_inputs.append(event_info_input)
205 output_stream = CompFactory.AthenaOutputStream(
206 name=
"TransBSStreamAlg",
207 EvtConversionSvc=bytestream_conversion.name,
208 OutputFile=
"ByteStreamRDP_OutputSvc",
209 ItemList=item_list
if item_list
else list(),
210 ExtraInputs=extra_inputs,
211 ExtraOutputs=extra_outputs
213 result.addEventAlgo(output_stream,
216 address_provider = CompFactory.ByteStreamAddressProviderSvc(
217 TypeNames=type_names
if type_names
else list(),
219 result.addService(address_provider)
221 result.addService(CompFactory.ProxyProviderSvc(
222 ProviderNames = [address_provider.name]))
229 if __name__ ==
"__main__":
230 """Run a functional test if module is executed"""
232 from AthenaConfiguration.MainServicesConfig
import MainServicesCfg
233 from AthenaConfiguration.TestDefaults
import defaultTestFiles
234 from AthenaConfiguration.AllConfigFlags
import initConfigFlags
235 from AthenaCommon.Logging
import logging
237 log = logging.getLogger(
'ByteStreamConfig')
240 flags.Input.Files = defaultTestFiles.RAW_RUN2
241 flags.Output.doWriteBS =
True
245 read.store(
open(
"test.pkl",
"wb"))
250 log.info(
"Write setup OK")
256 log.info(
"Config OK")
258 with open(
'ByteStreamConfig.pkl',
'wb')
as pkl:
262 sys.exit(acc.run(10).isFailure())