3 """Set up to read and/or write bytestream files.
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
14 Typical usage examples:
16 component_accumulator.merge(byteStreamReadCfg(flags))
18 from AthenaConfiguration.ComponentFactory
import CompFactory
19 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig
import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig
import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
26 """Set up to read from a bytestream file
28 The function adds the components required to read events and metadata from
29 bytestream input. May be used to read events from a secondary input as well
33 flags: Job configuration flags
34 type_names: (optional) specific type names for address provider to find
37 A component accumulator fragment containing the components required to
38 read from bytestream. Should be merged into main job configuration.
42 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43 result.addService(bytestream_conversion)
45 if flags.Common.isOnline
and not any(flags.Input.Files)
and not (flags.Trigger.doHLT
or flags.Trigger.doLVL1):
46 bytestream_input = CompFactory.ByteStreamEmonInputSvc(
"ByteStreamInputSvc")
48 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
49 name=
"ByteStreamInputSvc",
50 EventInfoKey=
"{}EventInfo".
format(
51 flags.Overlay.BkgPrefix
if flags.Overlay.ByteStream
else ""
54 result.addService(bytestream_input)
56 if flags.Input.SecondaryFiles:
57 event_selector = CompFactory.EventSelectorByteStream(
58 name=
"SecondaryEventSelector",
60 Input=flags.Input.SecondaryFiles,
61 SkipEvents=flags.Exec.SkipEvents
if flags.Overlay.SkipSecondaryEvents >= 0
else flags.Exec.SkipEvents,
62 ByteStreamInputSvc=bytestream_input.name,
64 result.addService(event_selector)
66 event_selector = CompFactory.EventSelectorByteStream(
68 Input=flags.Input.Files,
69 SkipEvents=flags.Exec.SkipEvents,
70 ByteStreamInputSvc=bytestream_input.name,
72 result.addService(event_selector)
73 result.setAppProperty(
"EvtSel", event_selector.name)
75 event_persistency = CompFactory.EvtPersistencySvc(
76 name=
"EventPersistencySvc", CnvServices=[bytestream_conversion.name]
78 result.addService(event_persistency)
80 result.addService(CompFactory.ROBDataProviderSvc())
82 address_provider = CompFactory.ByteStreamAddressProviderSvc(
83 TypeNames=type_names
if type_names
else list(),
85 result.addService(address_provider)
88 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
91 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
92 result.addService(proxy)
100 """Set up output stream in RAW/bytestream format
102 Configure components responsible for writing bytestream format. Write job
103 results to bytestream file. ATLAS file naming conventions are enforced as
104 determined from the given configuration flags.
107 flags: Job configuration flags
108 type_names: (optional) Specify item list for output stream to write
111 A component accumulator fragment containing the components required to
112 write to bytestream. Should be merged into main job configuration.
114 all_runs =
set(flags.Input.RunNumbers)
117 ),
"Input is from multiple runs, do not know which one to use {}".
format(
122 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
125 OutputDirectory=
"./",
127 RunNumber=all_runs.pop(),
129 result.addService(event_storage_output)
133 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
134 name=
"ByteStreamCnvSvc",
135 ByteStreamOutputSvcList=[event_storage_output.getName()],
137 result.addService(bytestream_conversion)
140 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
142 output_stream = CompFactory.AthenaOutputStream(
143 name=
"BSOutputStreamAlg",
144 EvtConversionSvc=bytestream_conversion.name,
145 OutputFile=
"ByteStreamEventStorageOutputSvc",
146 ItemList=type_names
if type_names
else list(),
147 ExtraInputs=[event_info_input]
149 result.addEventAlgo(output_stream, primary=
True)
154 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
160 """Set up transient ByteStream output stream
162 Configure components responsible for writing bytestream format. Write the
163 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
164 The data can then be read downstream as if they were coming from a BS file.
167 flags: Job configuration flags
168 item_list: (optional) List of objects to be written to transient ByteStream
169 type_names: (optional) List of types/names to register in BS conversion service
170 as available to be read from (transient) ByteStream
171 extra_inputs: (optional) List of objects which need to be produced before transient
172 ByteStream streaming is scheduled - ensures correct scheduling
175 A component accumulator fragment containing the components required to
176 write transient bytestream. Should be merged into main job configuration.
181 result.addService(CompFactory.ROBDataProviderSvc())
183 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
184 result.addService(rdp_output)
186 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
187 name=
"ByteStreamCnvSvc",
188 FillTriggerBits=
False,
189 ByteStreamOutputSvcList=[rdp_output.getName()],
191 result.addService(bytestream_conversion)
195 extra_outputs = [(
"TransientBSOutType",
"StoreGateSvc+TransientBSOutKey")]
198 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
200 extra_inputs = [event_info_input]
201 elif event_info_input
not in extra_inputs:
202 extra_inputs.append(event_info_input)
204 output_stream = CompFactory.AthenaOutputStream(
205 name=
"TransBSStreamAlg",
206 EvtConversionSvc=bytestream_conversion.name,
207 OutputFile=
"ByteStreamRDP_OutputSvc",
208 ItemList=item_list
if item_list
else list(),
209 ExtraInputs=extra_inputs,
210 ExtraOutputs=extra_outputs
212 result.addEventAlgo(output_stream,
215 address_provider = CompFactory.ByteStreamAddressProviderSvc(
216 TypeNames=type_names
if type_names
else list(),
218 result.addService(address_provider)
220 result.addService(CompFactory.ProxyProviderSvc(
221 ProviderNames = [address_provider.name]))
228 if __name__ ==
"__main__":
229 """Run a functional test if module is executed"""
231 from AthenaConfiguration.MainServicesConfig
import MainServicesCfg
232 from AthenaConfiguration.TestDefaults
import defaultTestFiles
233 from AthenaConfiguration.AllConfigFlags
import initConfigFlags
234 from AthenaCommon.Logging
import logging
236 log = logging.getLogger(
'ByteStreamConfig')
239 flags.Input.Files = defaultTestFiles.RAW_RUN2
240 flags.Output.doWriteBS =
True
244 read.store(
open(
"test.pkl",
"wb"))
249 log.info(
"Write setup OK")
255 log.info(
"Config OK")
257 with open(
'ByteStreamConfig.pkl',
'wb')
as pkl:
261 sys.exit(acc.run(10).isFailure())