ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamConfig Namespace Reference

Functions

 ByteStreamReadCfg (flags, type_names=None)
 ByteStreamWriteCfg (flags, type_names=None)
 TransientByteStreamCfg (flags, item_list=None, type_names=None, extra_inputs=None)
 ByteStreamCfg (flags, **kwargs)

Variables

 log = logging.getLogger('ByteStreamConfig')
 flags = initConfigFlags()
 Files
 doWriteBS
 read = ByteStreamReadCfg(flags)
 write = ByteStreamWriteCfg(flags)
 acc = MainServicesCfg(flags)

Detailed Description

Set up to read and/or write bytestream files.

This module configures the Athena components required to read from
RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
and byteStreamWriteCfg to set up for writing. Merge the component accumulator
the functions return into your job configuration as needed.

Executing this module will run a short test over 10 events. The events are read
from the default bytestream input on CVMFS and written to a local bytestream
file.

    Typical usage examples:

        component_accumulator.merge(byteStreamReadCfg(flags))

Function Documentation

◆ ByteStreamCfg()

ByteStreamConfig.ByteStreamCfg ( flags,
** kwargs )

Definition at line 5 of file graphics/EventDisplaysOnline/python/ByteStreamConfig.py.

5def ByteStreamCfg(flags, **kwargs):
6
7 acc = ComponentAccumulator()
8
9 from ByteStreamEmonSvc.EmonByteStreamConfig import EmonByteStreamCfg
10 acc.merge(EmonByteStreamCfg(flags))
11
12 bytestreamInput = acc.getService("ByteStreamInputSvc")
13 bytestreamInput.Partition = flags.OnlineEventDisplays.PartitionName
14 bytestreamInput.GroupName = "EventDisplaysOnline"
15 bytestreamInput.PublishName = "EventDisplays"
16 bytestreamInput.Key = "dcm"
17 bytestreamInput.KeyCount = 1
18 bytestreamInput.Timeout = 600000
19 bytestreamInput.UpdatePeriod = 200
20 bytestreamInput.BufferSize = 10
21 bytestreamInput.ISServer = '' # Disable histogramming
22 bytestreamInput.StreamNames = flags.OnlineEventDisplays.TriggerStreams
23 bytestreamInput.ProcessCorruptedEvents = True
24 #bytestreamInput.StreamType = "physics" #comment out for all streams, e.g. if you also want claibration streams
25 bytestreamInput.StreamLogic = "Or"
26
27 if flags.OnlineEventDisplays.BeamSplashMode:
28 bytestreamInput.KeyCount = 64 # equal or greater than the number of DCMs for beam splashes
29 bytestreamInput.BufferSize = 192 # at least three times of keycount for beam splashes
30 bytestreamInput.Timeout = 144000000 #(40 hrs)
31 bytestreamInput.StreamType = "physics" #if trigger fails it will go to debug_HltError
32
33 if flags.OnlineEventDisplays.PartitionName != 'ATLAS':
34 bytestreamInput.KeyValue = [ 'Test_emon_push' ]
35 bytestreamInput.KeyCount = 1
36
37 return acc

◆ ByteStreamReadCfg()

ByteStreamConfig.ByteStreamReadCfg ( flags,
type_names = None )
Set up to read from a bytestream file

The function adds the components required to read events and metadata from
bytestream input. May be used to read events from a secondary input as well
primary input file.

Args:
    flags:      Job configuration flags
    type_names: (optional) specific type names for address provider to find

Returns:
    A component accumulator fragment containing the components required to
    read from bytestream. Should be merged into main job configuration.

Definition at line 26 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

26def ByteStreamReadCfg(flags, type_names=None):
27 """Set up to read from a bytestream file
28
29 The function adds the components required to read events and metadata from
30 bytestream input. May be used to read events from a secondary input as well
31 primary input file.
32
33 Args:
34 flags: Job configuration flags
35 type_names: (optional) specific type names for address provider to find
36
37 Returns:
38 A component accumulator fragment containing the components required to
39 read from bytestream. Should be merged into main job configuration.
40 """
41 result = ComponentAccumulator()
42
43 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
44 result.addService(bytestream_conversion)
45
46 eiName = "EventInfo"
47 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
48 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
49 else:
50 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Common.ProductionStep is ProductionStep.MinbiasPreprocessing else "")
51 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
52 name="ByteStreamInputSvc",
53 EventInfoKey=eiName)
54 result.addService(bytestream_input)
55
56 if flags.Input.SecondaryFiles:
57 event_selector = CompFactory.EventSelectorByteStream(
58 name="SecondaryEventSelector",
59 IsSecondary=True,
60 Input=flags.Input.SecondaryFiles,
61 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
62 ByteStreamInputSvc=bytestream_input.name,
63 )
64 result.addService(event_selector)
65 else:
66 event_selector = CompFactory.EventSelectorByteStream(
67 name="EventSelector",
68 Input=flags.Input.Files,
69 SkipEvents=flags.Exec.SkipEvents,
70 ByteStreamInputSvc=bytestream_input.name,
71 )
72 result.addService(event_selector)
73 result.setAppProperty("EvtSel", event_selector.name)
74
75 event_persistency = CompFactory.EvtPersistencySvc(
76 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
77 )
78 result.addService(event_persistency)
79
80 result.addService(CompFactory.ROBDataProviderSvc())
81
82 address_provider = CompFactory.ByteStreamAddressProviderSvc(
83 TypeNames=type_names if type_names else list(),
84 )
85 result.addService(address_provider)
86
87 result.merge(
88 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
89 )
90
91 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
92 result.addService(proxy)
93
94 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
95 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
96 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
97
98 return result
99
100

◆ ByteStreamWriteCfg()

ByteStreamConfig.ByteStreamWriteCfg ( flags,
type_names = None )
Set up output stream in RAW/bytestream format

Configure components responsible for writing bytestream format. Write job
results to bytestream file. ATLAS file naming conventions are enforced as
determined from the given configuration flags.

Args:
    flags:      Job configuration flags
    type_names: (optional) Specify item list for output stream to write

Returns:
    A component accumulator fragment containing the components required to
    write to bytestream. Should be merged into main job configuration.

Definition at line 101 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

101def ByteStreamWriteCfg(flags, type_names=None):
102 """Set up output stream in RAW/bytestream format
103
104 Configure components responsible for writing bytestream format. Write job
105 results to bytestream file. ATLAS file naming conventions are enforced as
106 determined from the given configuration flags.
107
108 Args:
109 flags: Job configuration flags
110 type_names: (optional) Specify item list for output stream to write
111
112 Returns:
113 A component accumulator fragment containing the components required to
114 write to bytestream. Should be merged into main job configuration.
115 """
116 all_runs = set(flags.Input.RunNumbers)
117 assert (
118 len(all_runs) == 1
119 ), "Input is from multiple runs, do not know which one to use {}".format(
120 all_runs
121 )
122 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
123
124 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
125 MaxFileMB=15000,
126 MaxFileNE=15000000, # event (beyond which it creates a new file)
127 OutputDirectory="./",
128 SimpleFileName=flags.Output.BSFileName,
129 AppName="Athena",
130 RunNumber=all_runs.pop(),
131 )
132 result.addService(event_storage_output)
133 # release variable depends the way the env is configured
134 # FileTag = release
135
136 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
137 name="ByteStreamCnvSvc",
138 ByteStreamOutputSvcList=[event_storage_output.getName()],
139 )
140 result.addService(bytestream_conversion)
141
142 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
143 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
144
145 output_stream = CompFactory.AthenaOutputStream(
146 name="BSOutputStreamAlg",
147 EvtConversionSvc=bytestream_conversion.name,
148 OutputFile="ByteStreamEventStorageOutputSvc",
149 ItemList=type_names if type_names else list(),
150 ExtraInputs=[event_info_input]
151 )
152 result.addEventAlgo(output_stream, primary=True)
153
154 result.merge(IOVDbSvcCfg(flags))
155
156 result.merge(
157 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
158 )
159
160 return result
161
STL class.

◆ TransientByteStreamCfg()

ByteStreamConfig.TransientByteStreamCfg ( flags,
item_list = None,
type_names = None,
extra_inputs = None )
Set up transient ByteStream output stream

Configure components responsible for writing bytestream format. Write the
specified objects to ByteStream into the cache of the ROBDataProviderSvc.
The data can then be read downstream as if they were coming from a BS file.

Args:
    flags:        Job configuration flags
    item_list:    (optional) List of objects to be written to transient ByteStream
    type_names:   (optional) List of types/names to register in BS conversion service
                  as available to be read from (transient) ByteStream
    extra_inputs: (optional) List of objects which need to be produced before transient
                  ByteStream streaming is scheduled - ensures correct scheduling

Returns:
    A component accumulator fragment containing the components required to
    write transient bytestream. Should be merged into main job configuration.

Definition at line 162 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

162def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
163 """Set up transient ByteStream output stream
164
165 Configure components responsible for writing bytestream format. Write the
166 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
167 The data can then be read downstream as if they were coming from a BS file.
168
169 Args:
170 flags: Job configuration flags
171 item_list: (optional) List of objects to be written to transient ByteStream
172 type_names: (optional) List of types/names to register in BS conversion service
173 as available to be read from (transient) ByteStream
174 extra_inputs: (optional) List of objects which need to be produced before transient
175 ByteStream streaming is scheduled - ensures correct scheduling
176
177 Returns:
178 A component accumulator fragment containing the components required to
179 write transient bytestream. Should be merged into main job configuration.
180 """
181
182 result = ComponentAccumulator()
183
184 result.addService(CompFactory.ROBDataProviderSvc())
185
186 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
187 result.addService(rdp_output)
188
189 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
190 name="ByteStreamCnvSvc",
191 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
192 ByteStreamOutputSvcList=[rdp_output.getName()],
193 )
194 result.addService(bytestream_conversion)
195
196 # Special fictitious extra output which can be used to ensure correct
197 # scheduling of transient ByteStream clients
198 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
199
200 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
201 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
202 if not extra_inputs:
203 extra_inputs = [event_info_input]
204 elif event_info_input not in extra_inputs:
205 extra_inputs.append(event_info_input)
206
207 output_stream = CompFactory.AthenaOutputStream(
208 name="TransBSStreamAlg",
209 EvtConversionSvc=bytestream_conversion.name,
210 OutputFile="ByteStreamRDP_OutputSvc",
211 ItemList=item_list if item_list else list(),
212 ExtraInputs=extra_inputs,
213 ExtraOutputs=extra_outputs
214 )
215 result.addEventAlgo(output_stream,
216 primary=True)
217
218 address_provider = CompFactory.ByteStreamAddressProviderSvc(
219 TypeNames=type_names if type_names else list(),
220 )
221 result.addService(address_provider)
222
223 result.addService(CompFactory.ProxyProviderSvc(
224 ProviderNames = [address_provider.name]))
225
226 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
227
228 return result
229
230

Variable Documentation

◆ acc

ByteStreamConfig.acc = MainServicesCfg(flags)

◆ doWriteBS

ByteStreamConfig.doWriteBS

◆ Files

ByteStreamConfig.Files

◆ flags

ByteStreamConfig.flags = initConfigFlags()

◆ log

ByteStreamConfig.log = logging.getLogger('ByteStreamConfig')

◆ read

ByteStreamConfig.read = ByteStreamReadCfg(flags)

◆ write

ByteStreamConfig.write = ByteStreamWriteCfg(flags)