ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamConfig Namespace Reference

Functions

 ByteStreamReadCfg (flags, type_names=None)
 ByteStreamWriteCfg (flags, type_names=None)
 MCEventInfoByteStreamToolCfg (flags, name="MCEventInfoByteStreamTool", writeBS=False)
 TransientByteStreamCfg (flags, item_list=None, type_names=None, extra_inputs=None)
 ByteStreamCfg (flags, **kwargs)

Variables

 log = logging.getLogger('ByteStreamConfig')
 flags = initConfigFlags()
 Files
 doWriteBS
 read = ByteStreamReadCfg(flags)
 write = ByteStreamWriteCfg(flags)
 acc = MainServicesCfg(flags)

Detailed Description

Set up to read and/or write bytestream files.

This module configures the Athena components required to read from
RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
and byteStreamWriteCfg to set up for writing. Merge the component accumulator
the functions return into your job configuration as needed.

Executing this module will run a short test over 10 events. The events are read
from the default bytestream input on CVMFS and written to a local bytestream
file.

    Typical usage examples:

        component_accumulator.merge(byteStreamReadCfg(flags))

Function Documentation

◆ ByteStreamCfg()

ByteStreamConfig.ByteStreamCfg ( flags,
** kwargs )

Definition at line 5 of file graphics/EventDisplaysOnline/python/ByteStreamConfig.py.

5def ByteStreamCfg(flags, **kwargs):
6
7 acc = ComponentAccumulator()
8
9 from ByteStreamEmonSvc.EmonByteStreamConfig import EmonByteStreamCfg
10 acc.merge(EmonByteStreamCfg(flags))
11
12 bytestreamInput = acc.getService("ByteStreamInputSvc")
13 bytestreamInput.Partition = flags.OnlineEventDisplays.PartitionName
14 bytestreamInput.GroupName = "EventDisplaysOnline"
15 bytestreamInput.PublishName = "EventDisplays"
16 bytestreamInput.Key = "dcm"
17 bytestreamInput.KeyCount = 1
18 bytestreamInput.Timeout = 600000
19 bytestreamInput.UpdatePeriod = 200
20 bytestreamInput.BufferSize = 10
21 bytestreamInput.ISServer = '' # Disable histogramming
22 bytestreamInput.StreamNames = flags.OnlineEventDisplays.TriggerStreams
23 bytestreamInput.ProcessCorruptedEvents = True
24 #bytestreamInput.StreamType = "physics" #comment out for all streams, e.g. if you also want claibration streams
25 bytestreamInput.StreamLogic = "Or"
26
27 if flags.OnlineEventDisplays.BeamSplashMode:
28 bytestreamInput.KeyCount = 64 # equal or greater than the number of DCMs for beam splashes
29 bytestreamInput.BufferSize = 192 # at least three times of keycount for beam splashes
30 bytestreamInput.Timeout = 144000000 #(40 hrs)
31 bytestreamInput.StreamType = "physics" #if trigger fails it will go to debug_HltError
32
33 if flags.OnlineEventDisplays.PartitionName != 'ATLAS':
34 bytestreamInput.KeyValue = [ 'Test_emon_push' ]
35 bytestreamInput.KeyCount = 1
36
37 return acc

◆ ByteStreamReadCfg()

ByteStreamConfig.ByteStreamReadCfg ( flags,
type_names = None )
Set up to read from a bytestream file

The function adds the components required to read events and metadata from
bytestream input. May be used to read events from a secondary input as well
primary input file.

Args:
    flags:      Job configuration flags
    type_names: (optional) specific type names for address provider to find

Returns:
    A component accumulator fragment containing the components required to
    read from bytestream. Should be merged into main job configuration.

Definition at line 26 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

26def ByteStreamReadCfg(flags, type_names=None):
27 """Set up to read from a bytestream file
28
29 The function adds the components required to read events and metadata from
30 bytestream input. May be used to read events from a secondary input as well
31 primary input file.
32
33 Args:
34 flags: Job configuration flags
35 type_names: (optional) specific type names for address provider to find
36
37 Returns:
38 A component accumulator fragment containing the components required to
39 read from bytestream. Should be merged into main job configuration.
40 """
41 result = ComponentAccumulator()
42
43 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
44 IsSimulation=flags.Input.isMC,
45 )
46 result.addService(bytestream_conversion)
47
48 # For MC ByteStream, add the MCEventInfoByteStreamTool to decode MC EventInfo
49 if flags.Input.isMC:
50 mcEventInfoTool = CompFactory.MCEventInfoByteStreamTool(
51 name="MCEventInfoByteStreamTool",
52 ROBIDs=[0x00ff0001], # SubDetector=OTHER=0xFF, ModuleId=0x01
53 EventInfoReadKey="", # Empty for decoding mode
54 )
55 result.addPublicTool(mcEventInfoTool)
56
57 eiName = "EventInfo"
58 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
59 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
60 else:
61 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Common.ProductionStep is ProductionStep.MinbiasPreprocessing else "")
62 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
63 name="ByteStreamInputSvc",
64 EventInfoKey=eiName)
65 result.addService(bytestream_input)
66
67 if flags.Input.SecondaryFiles:
68 event_selector = CompFactory.EventSelectorByteStream(
69 name="SecondaryEventSelector",
70 IsSecondary=True,
71 Input=flags.Input.SecondaryFiles,
72 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
73 ByteStreamInputSvc=bytestream_input.name,
74 )
75 result.addService(event_selector)
76 else:
77 event_selector = CompFactory.EventSelectorByteStream(
78 name="EventSelector",
79 Input=flags.Input.Files,
80 SkipEvents=flags.Exec.SkipEvents,
81 ByteStreamInputSvc=bytestream_input.name,
82 )
83 result.addService(event_selector)
84 result.setAppProperty("EvtSel", event_selector.name)
85
86 event_persistency = CompFactory.EvtPersistencySvc(
87 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
88 )
89 result.addService(event_persistency)
90
91 result.addService(CompFactory.ROBDataProviderSvc())
92
93 address_provider = CompFactory.ByteStreamAddressProviderSvc(
94 TypeNames=type_names if type_names else list(),
95 )
96 result.addService(address_provider)
97
98 result.merge(
99 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
100 )
101
102 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
103 result.addService(proxy)
104
105 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
106 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
107 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
108
109 return result
110
111

◆ ByteStreamWriteCfg()

ByteStreamConfig.ByteStreamWriteCfg ( flags,
type_names = None )
Set up output stream in RAW/bytestream format

Configure components responsible for writing bytestream format. Write job
results to bytestream file. ATLAS file naming conventions are enforced as
determined from the given configuration flags.

Args:
    flags:      Job configuration flags
    type_names: (optional) Specify item list for output stream to write

Returns:
    A component accumulator fragment containing the components required to
    write to bytestream. Should be merged into main job configuration.

Definition at line 112 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

112def ByteStreamWriteCfg(flags, type_names=None):
113 """Set up output stream in RAW/bytestream format
114
115 Configure components responsible for writing bytestream format. Write job
116 results to bytestream file. ATLAS file naming conventions are enforced as
117 determined from the given configuration flags.
118
119 Args:
120 flags: Job configuration flags
121 type_names: (optional) Specify item list for output stream to write
122
123 Returns:
124 A component accumulator fragment containing the components required to
125 write to bytestream. Should be merged into main job configuration.
126 """
127 all_runs = set(flags.Input.RunNumbers)
128 assert (
129 len(all_runs) == 1
130 ), "Input is from multiple runs, do not know which one to use {}".format(
131 all_runs
132 )
133 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
134
135 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
136 MaxFileMB=15000,
137 MaxFileNE=15000000, # event (beyond which it creates a new file)
138 OutputDirectory="./",
139 SimpleFileName=flags.Output.BSFileName,
140 AppName="Athena",
141 RunNumber=all_runs.pop(),
142 )
143 result.addService(event_storage_output)
144 # release variable depends the way the env is configured
145 # FileTag = release
146
147 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
148 name="ByteStreamCnvSvc",
149 ByteStreamOutputSvcList=[event_storage_output.getName()],
150 IsSimulation=flags.Input.isMC,
151 )
152 result.addService(bytestream_conversion)
153
154 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
155 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
156
157 output_stream = CompFactory.AthenaOutputStream(
158 name="BSOutputStreamAlg",
159 EvtConversionSvc=bytestream_conversion.name,
160 OutputFile="ByteStreamEventStorageOutputSvc",
161 ItemList=type_names if type_names else list(),
162 ExtraInputs=[event_info_input]
163 )
164 result.addEventAlgo(output_stream, primary=True)
165
166 result.merge(IOVDbSvcCfg(flags))
167
168 result.merge(
169 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
170 )
171
172 return result
173
STL class.

◆ MCEventInfoByteStreamToolCfg()

ByteStreamConfig.MCEventInfoByteStreamToolCfg ( flags,
name = "MCEventInfoByteStreamTool",
writeBS = False )
Configure the MCEventInfoByteStreamTool for encoding/decoding MC EventInfo

This tool serializes MC-specific EventInfo fields (mcChannelNumber, mcEventNumber,
mcEventWeights, pileup information, etc.) into a dedicated ROB fragment for MC
ByteStream files (RDO->BS workflow).

Args:
    flags:    Job configuration flags
    name:     Tool name
    writeBS:  If True, configure for encoding, else for decoding

Returns:
    A component accumulator fragment containing the configured tool

Definition at line 174 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

174def MCEventInfoByteStreamToolCfg(flags, name="MCEventInfoByteStreamTool", writeBS=False):
175 """Configure the MCEventInfoByteStreamTool for encoding/decoding MC EventInfo
176
177 This tool serializes MC-specific EventInfo fields (mcChannelNumber, mcEventNumber,
178 mcEventWeights, pileup information, etc.) into a dedicated ROB fragment for MC
179 ByteStream files (RDO->BS workflow).
180
181 Args:
182 flags: Job configuration flags
183 name: Tool name
184 writeBS: If True, configure for encoding, else for decoding
185
186 Returns:
187 A component accumulator fragment containing the configured tool
188 """
189 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
190 from AthenaConfiguration.ComponentFactory import CompFactory
191
192 acc = ComponentAccumulator()
193 tool = CompFactory.MCEventInfoByteStreamTool(name)
194
195 # ROB ID for MC EventInfo (SubDetector=OTHER=0xFF, ModuleId=0x01)
196 mc_eventinfo_robid = 0x00ff0001
197 tool.ROBIDs = [mc_eventinfo_robid]
198
199 if writeBS:
200 # write BS == read xAOD (encoding mode)
201 tool.EventInfoReadKey = "EventInfo"
202 else:
203 # read BS == write xAOD (decoding mode)
204 tool.EventInfoReadKey = ""
205
206 acc.setPrivateTools(tool)
207 return acc
208
209

◆ TransientByteStreamCfg()

ByteStreamConfig.TransientByteStreamCfg ( flags,
item_list = None,
type_names = None,
extra_inputs = None )
Set up transient ByteStream output stream

Configure components responsible for writing bytestream format. Write the
specified objects to ByteStream into the cache of the ROBDataProviderSvc.
The data can then be read downstream as if they were coming from a BS file.

Args:
    flags:        Job configuration flags
    item_list:    (optional) List of objects to be written to transient ByteStream
    type_names:   (optional) List of types/names to register in BS conversion service
                  as available to be read from (transient) ByteStream
    extra_inputs: (optional) List of objects which need to be produced before transient
                  ByteStream streaming is scheduled - ensures correct scheduling

Returns:
    A component accumulator fragment containing the components required to
    write transient bytestream. Should be merged into main job configuration.

Definition at line 210 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

210def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
211 """Set up transient ByteStream output stream
212
213 Configure components responsible for writing bytestream format. Write the
214 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
215 The data can then be read downstream as if they were coming from a BS file.
216
217 Args:
218 flags: Job configuration flags
219 item_list: (optional) List of objects to be written to transient ByteStream
220 type_names: (optional) List of types/names to register in BS conversion service
221 as available to be read from (transient) ByteStream
222 extra_inputs: (optional) List of objects which need to be produced before transient
223 ByteStream streaming is scheduled - ensures correct scheduling
224
225 Returns:
226 A component accumulator fragment containing the components required to
227 write transient bytestream. Should be merged into main job configuration.
228 """
229
230 result = ComponentAccumulator()
231
232 result.addService(CompFactory.ROBDataProviderSvc())
233
234 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
235 result.addService(rdp_output)
236
237 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
238 name="ByteStreamCnvSvc",
239 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
240 ByteStreamOutputSvcList=[rdp_output.getName()],
241 IsSimulation=flags.Input.isMC,
242 )
243 result.addService(bytestream_conversion)
244
245 # Special fictitious extra output which can be used to ensure correct
246 # scheduling of transient ByteStream clients
247 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
248
249 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
250 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
251 if not extra_inputs:
252 extra_inputs = [event_info_input]
253 elif event_info_input not in extra_inputs:
254 extra_inputs.append(event_info_input)
255
256 output_stream = CompFactory.AthenaOutputStream(
257 name="TransBSStreamAlg",
258 EvtConversionSvc=bytestream_conversion.name,
259 OutputFile="ByteStreamRDP_OutputSvc",
260 ItemList=item_list if item_list else list(),
261 ExtraInputs=extra_inputs,
262 ExtraOutputs=extra_outputs
263 )
264 result.addEventAlgo(output_stream,
265 primary=True)
266
267 address_provider = CompFactory.ByteStreamAddressProviderSvc(
268 TypeNames=type_names if type_names else list(),
269 )
270 result.addService(address_provider)
271
272 result.addService(CompFactory.ProxyProviderSvc(
273 ProviderNames = [address_provider.name]))
274
275 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
276
277 return result
278
279

Variable Documentation

◆ acc

ByteStreamConfig.acc = MainServicesCfg(flags)

◆ doWriteBS

ByteStreamConfig.doWriteBS

◆ Files

ByteStreamConfig.Files

◆ flags

ByteStreamConfig.flags = initConfigFlags()

◆ log

ByteStreamConfig.log = logging.getLogger('ByteStreamConfig')

◆ read

ByteStreamConfig.read = ByteStreamReadCfg(flags)

◆ write

ByteStreamConfig.write = ByteStreamWriteCfg(flags)