27 """Set up to read from a bytestream file
29 The function adds the components required to read events and metadata from
30 bytestream input. May be used to read events from a secondary input as well
34 flags: Job configuration flags
35 type_names: (optional) specific type names for address provider to find
38 A component accumulator fragment containing the components required to
39 read from bytestream. Should be merged into main job configuration.
41 result = ComponentAccumulator()
43 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
44 IsSimulation=flags.Input.isMC,
46 result.addService(bytestream_conversion)
50 mcEventInfoTool = CompFactory.MCEventInfoByteStreamTool(
51 name=
"MCEventInfoByteStreamTool",
55 result.addPublicTool(mcEventInfoTool)
58 if flags.Common.isOnline
and not any(flags.Input.Files)
and not (flags.Trigger.doHLT
or flags.Trigger.doLVL1):
59 bytestream_input = CompFactory.ByteStreamEmonInputSvc(
"ByteStreamInputSvc")
61 eiName =
"{}EventInfo".format(flags.Overlay.BkgPrefix
if flags.Common.ProductionStep
is ProductionStep.MinbiasPreprocessing
else "")
62 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
63 name=
"ByteStreamInputSvc",
65 result.addService(bytestream_input)
67 if flags.Input.SecondaryFiles:
68 event_selector = CompFactory.EventSelectorByteStream(
69 name=
"SecondaryEventSelector",
71 Input=flags.Input.SecondaryFiles,
72 SkipEvents=flags.Exec.SkipEvents
if flags.Overlay.SkipSecondaryEvents >= 0
else flags.Exec.SkipEvents,
73 ByteStreamInputSvc=bytestream_input.name,
75 result.addService(event_selector)
77 event_selector = CompFactory.EventSelectorByteStream(
79 Input=flags.Input.Files,
80 SkipEvents=flags.Exec.SkipEvents,
81 ByteStreamInputSvc=bytestream_input.name,
83 result.addService(event_selector)
84 result.setAppProperty(
"EvtSel", event_selector.name)
86 event_persistency = CompFactory.EvtPersistencySvc(
87 name=
"EventPersistencySvc", CnvServices=[bytestream_conversion.name]
89 result.addService(event_persistency)
91 result.addService(CompFactory.ROBDataProviderSvc())
93 address_provider = CompFactory.ByteStreamAddressProviderSvc(
94 TypeNames=type_names
if type_names
else list(),
96 result.addService(address_provider)
99 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
102 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
103 result.addService(proxy)
105 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
106 ExtraOutputs=[(
"xAOD::EventInfo",f
"StoreGateSvc+{eiName}"),
107 (
"xAOD::EventAuxInfo",f
"StoreGateSvc+{eiName}Aux.")]))
113 """Set up output stream in RAW/bytestream format
115 Configure components responsible for writing bytestream format. Write job
116 results to bytestream file. ATLAS file naming conventions are enforced as
117 determined from the given configuration flags.
120 flags: Job configuration flags
121 type_names: (optional) Specify item list for output stream to write
124 A component accumulator fragment containing the components required to
125 write to bytestream. Should be merged into main job configuration.
127 all_runs =
set(flags.Input.RunNumbers)
130 ),
"Input is from multiple runs, do not know which one to use {}".format(
133 result = ComponentAccumulator(CompFactory.AthSequencer(
'AthOutSeq', StopOverride=
True))
135 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
138 OutputDirectory=
"./",
139 SimpleFileName=flags.Output.BSFileName,
141 RunNumber=all_runs.pop(),
143 result.addService(event_storage_output)
147 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
148 name=
"ByteStreamCnvSvc",
149 ByteStreamOutputSvcList=[event_storage_output.getName()],
150 IsSimulation=flags.Input.isMC,
152 result.addService(bytestream_conversion)
155 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
157 output_stream = CompFactory.AthenaOutputStream(
158 name=
"BSOutputStreamAlg",
159 EvtConversionSvc=bytestream_conversion.name,
160 OutputFile=
"ByteStreamEventStorageOutputSvc",
161 ItemList=type_names
if type_names
else list(),
162 ExtraInputs=[event_info_input]
164 result.addEventAlgo(output_stream, primary=
True)
166 result.merge(IOVDbSvcCfg(flags))
169 MetaDataSvcCfg(flags, [
"IOVDbMetaDataTool",
"ByteStreamMetadataTool"])
175 """Configure the MCEventInfoByteStreamTool for encoding/decoding MC EventInfo
177 This tool serializes MC-specific EventInfo fields (mcChannelNumber, mcEventNumber,
178 mcEventWeights, pileup information, etc.) into a dedicated ROB fragment for MC
179 ByteStream files (RDO->BS workflow).
182 flags: Job configuration flags
184 writeBS: If True, configure for encoding, else for decoding
187 A component accumulator fragment containing the configured tool
189 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
190 from AthenaConfiguration.ComponentFactory
import CompFactory
192 acc = ComponentAccumulator()
193 tool = CompFactory.MCEventInfoByteStreamTool(name)
196 mc_eventinfo_robid = 0x00ff0001
197 tool.ROBIDs = [mc_eventinfo_robid]
201 tool.EventInfoReadKey =
"EventInfo"
204 tool.EventInfoReadKey =
""
206 acc.setPrivateTools(tool)
211 """Set up transient ByteStream output stream
213 Configure components responsible for writing bytestream format. Write the
214 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
215 The data can then be read downstream as if they were coming from a BS file.
218 flags: Job configuration flags
219 item_list: (optional) List of objects to be written to transient ByteStream
220 type_names: (optional) List of types/names to register in BS conversion service
221 as available to be read from (transient) ByteStream
222 extra_inputs: (optional) List of objects which need to be produced before transient
223 ByteStream streaming is scheduled - ensures correct scheduling
226 A component accumulator fragment containing the components required to
227 write transient bytestream. Should be merged into main job configuration.
230 result = ComponentAccumulator()
232 result.addService(CompFactory.ROBDataProviderSvc())
234 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
235 result.addService(rdp_output)
237 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
238 name=
"ByteStreamCnvSvc",
239 FillTriggerBits=
False,
240 ByteStreamOutputSvcList=[rdp_output.getName()],
241 IsSimulation=flags.Input.isMC,
243 result.addService(bytestream_conversion)
247 extra_outputs = [(
"TransientBSOutType",
"StoreGateSvc+TransientBSOutKey")]
250 event_info_input = (
'xAOD::EventInfo',
'StoreGateSvc+EventInfo')
252 extra_inputs = [event_info_input]
253 elif event_info_input
not in extra_inputs:
254 extra_inputs.append(event_info_input)
256 output_stream = CompFactory.AthenaOutputStream(
257 name=
"TransBSStreamAlg",
258 EvtConversionSvc=bytestream_conversion.name,
259 OutputFile=
"ByteStreamRDP_OutputSvc",
260 ItemList=item_list
if item_list
else list(),
261 ExtraInputs=extra_inputs,
262 ExtraOutputs=extra_outputs
264 result.addEventAlgo(output_stream,
267 address_provider = CompFactory.ByteStreamAddressProviderSvc(
268 TypeNames=type_names
if type_names
else list(),
270 result.addService(address_provider)
272 result.addService(CompFactory.ProxyProviderSvc(
273 ProviderNames = [address_provider.name]))
275 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))