ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamConfig Namespace Reference

Functions

 ByteStreamReadCfg (flags, type_names=None)
 ByteStreamWriteCfg (flags, type_names=None)
 TransientByteStreamCfg (flags, item_list=None, type_names=None, extra_inputs=None)
 ByteStreamCfg (flags, **kwargs)

Variables

 log = logging.getLogger('ByteStreamConfig')
 flags = initConfigFlags()
 Files
 doWriteBS
 read = ByteStreamReadCfg(flags)
 write = ByteStreamWriteCfg(flags)
 acc = MainServicesCfg(flags)

Detailed Description

Set up to read and/or write bytestream files.

This module configures the Athena components required to read from
RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
and byteStreamWriteCfg to set up for writing. Merge the component accumulator
the functions return into your job configuration as needed.

Executing this module will run a short test over 10 events. The events are read
from the default bytestream input on CVMFS and written to a local bytestream
file.

    Typical usage examples:

        component_accumulator.merge(byteStreamReadCfg(flags))

Function Documentation

◆ ByteStreamCfg()

ByteStreamConfig.ByteStreamCfg ( flags,
** kwargs )

Definition at line 5 of file graphics/EventDisplaysOnline/python/ByteStreamConfig.py.

5def ByteStreamCfg(flags, **kwargs):
6
7 acc = ComponentAccumulator()
8
9 from ByteStreamEmonSvc.EmonByteStreamConfig import EmonByteStreamCfg
10 acc.merge(EmonByteStreamCfg(flags))
11
12 bytestreamInput = acc.getService("ByteStreamInputSvc")
13 bytestreamInput.Partition = flags.OnlineEventDisplays.PartitionName
14 bytestreamInput.GroupName = "EventDisplaysOnline"
15 bytestreamInput.PublishName = "EventDisplays"
16 bytestreamInput.Key = "dcm"
17 bytestreamInput.KeyCount = 1
18 bytestreamInput.Timeout = 600000
19 bytestreamInput.UpdatePeriod = 200
20 bytestreamInput.BufferSize = 10
21 bytestreamInput.ISServer = '' # Disable histogramming
22 bytestreamInput.StreamNames = flags.OnlineEventDisplays.TriggerStreams
23 bytestreamInput.ProcessCorruptedEvents = True
24 #bytestreamInput.StreamType = "physics" #comment out for all streams, e.g. if you also want claibration streams
25 bytestreamInput.StreamLogic = "Or"
26
27 if flags.OnlineEventDisplays.BeamSplashMode:
28 bytestreamInput.KeyCount = 64 # equal or greater than the number of DCMs for beam splashes
29 bytestreamInput.BufferSize = 192 # at least three times of keycount for beam splashes
30 bytestreamInput.Timeout = 144000000 #(40 hrs)
31 bytestreamInput.StreamType = "physics" #if trigger fails it will go to debug_HltError
32
33 if flags.OnlineEventDisplays.PartitionName != 'ATLAS':
34 bytestreamInput.KeyValue = [ 'Test_emon_push' ]
35 bytestreamInput.KeyCount = 1
36
37 return acc

◆ ByteStreamReadCfg()

ByteStreamConfig.ByteStreamReadCfg ( flags,
type_names = None )
Set up to read from a bytestream file

The function adds the components required to read events and metadata from
bytestream input. May be used to read events from a secondary input as well
primary input file.

Args:
    flags:      Job configuration flags
    type_names: (optional) specific type names for address provider to find

Returns:
    A component accumulator fragment containing the components required to
    read from bytestream. Should be merged into main job configuration.

Definition at line 25 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

25def ByteStreamReadCfg(flags, type_names=None):
26 """Set up to read from a bytestream file
27
28 The function adds the components required to read events and metadata from
29 bytestream input. May be used to read events from a secondary input as well
30 primary input file.
31
32 Args:
33 flags: Job configuration flags
34 type_names: (optional) specific type names for address provider to find
35
36 Returns:
37 A component accumulator fragment containing the components required to
38 read from bytestream. Should be merged into main job configuration.
39 """
40 result = ComponentAccumulator()
41
42 bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43 result.addService(bytestream_conversion)
44
45 eiName = "EventInfo"
46 if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
47 bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
48 else:
49 eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Overlay.ByteStream else "")
50 bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
51 name="ByteStreamInputSvc",
52 EventInfoKey=eiName)
53 result.addService(bytestream_input)
54
55 if flags.Input.SecondaryFiles:
56 event_selector = CompFactory.EventSelectorByteStream(
57 name="SecondaryEventSelector",
58 IsSecondary=True,
59 Input=flags.Input.SecondaryFiles,
60 SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
61 ByteStreamInputSvc=bytestream_input.name,
62 )
63 result.addService(event_selector)
64 else:
65 event_selector = CompFactory.EventSelectorByteStream(
66 name="EventSelector",
67 Input=flags.Input.Files,
68 SkipEvents=flags.Exec.SkipEvents,
69 ByteStreamInputSvc=bytestream_input.name,
70 )
71 result.addService(event_selector)
72 result.setAppProperty("EvtSel", event_selector.name)
73
74 event_persistency = CompFactory.EvtPersistencySvc(
75 name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
76 )
77 result.addService(event_persistency)
78
79 result.addService(CompFactory.ROBDataProviderSvc())
80
81 address_provider = CompFactory.ByteStreamAddressProviderSvc(
82 TypeNames=type_names if type_names else list(),
83 )
84 result.addService(address_provider)
85
86 result.merge(
87 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
88 )
89
90 proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
91 result.addService(proxy)
92
93 result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
94 ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
95 ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
96
97 return result
98
99

◆ ByteStreamWriteCfg()

ByteStreamConfig.ByteStreamWriteCfg ( flags,
type_names = None )
Set up output stream in RAW/bytestream format

Configure components responsible for writing bytestream format. Write job
results to bytestream file. ATLAS file naming conventions are enforced as
determined from the given configuration flags.

Args:
    flags:      Job configuration flags
    type_names: (optional) Specify item list for output stream to write

Returns:
    A component accumulator fragment containing the components required to
    write to bytestream. Should be merged into main job configuration.

Definition at line 100 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

100def ByteStreamWriteCfg(flags, type_names=None):
101 """Set up output stream in RAW/bytestream format
102
103 Configure components responsible for writing bytestream format. Write job
104 results to bytestream file. ATLAS file naming conventions are enforced as
105 determined from the given configuration flags.
106
107 Args:
108 flags: Job configuration flags
109 type_names: (optional) Specify item list for output stream to write
110
111 Returns:
112 A component accumulator fragment containing the components required to
113 write to bytestream. Should be merged into main job configuration.
114 """
115 all_runs = set(flags.Input.RunNumbers)
116 assert (
117 len(all_runs) == 1
118 ), "Input is from multiple runs, do not know which one to use {}".format(
119 all_runs
120 )
121 result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
122
123 event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
124 MaxFileMB=15000,
125 MaxFileNE=15000000, # event (beyond which it creates a new file)
126 OutputDirectory="./",
127 SimpleFileName=flags.Output.BSFileName,
128 AppName="Athena",
129 RunNumber=all_runs.pop(),
130 )
131 result.addService(event_storage_output)
132 # release variable depends the way the env is configured
133 # FileTag = release
134
135 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
136 name="ByteStreamCnvSvc",
137 ByteStreamOutputSvcList=[event_storage_output.getName()],
138 )
139 result.addService(bytestream_conversion)
140
141 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
142 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
143
144 output_stream = CompFactory.AthenaOutputStream(
145 name="BSOutputStreamAlg",
146 EvtConversionSvc=bytestream_conversion.name,
147 OutputFile="ByteStreamEventStorageOutputSvc",
148 ItemList=type_names if type_names else list(),
149 ExtraInputs=[event_info_input]
150 )
151 result.addEventAlgo(output_stream, primary=True)
152
153 result.merge(IOVDbSvcCfg(flags))
154
155 result.merge(
156 MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
157 )
158
159 return result
160
STL class.

◆ TransientByteStreamCfg()

ByteStreamConfig.TransientByteStreamCfg ( flags,
item_list = None,
type_names = None,
extra_inputs = None )
Set up transient ByteStream output stream

Configure components responsible for writing bytestream format. Write the
specified objects to ByteStream into the cache of the ROBDataProviderSvc.
The data can then be read downstream as if they were coming from a BS file.

Args:
    flags:        Job configuration flags
    item_list:    (optional) List of objects to be written to transient ByteStream
    type_names:   (optional) List of types/names to register in BS conversion service
                  as available to be read from (transient) ByteStream
    extra_inputs: (optional) List of objects which need to be produced before transient
                  ByteStream streaming is scheduled - ensures correct scheduling

Returns:
    A component accumulator fragment containing the components required to
    write transient bytestream. Should be merged into main job configuration.

Definition at line 161 of file Event/ByteStreamCnvSvc/python/ByteStreamConfig.py.

161def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
162 """Set up transient ByteStream output stream
163
164 Configure components responsible for writing bytestream format. Write the
165 specified objects to ByteStream into the cache of the ROBDataProviderSvc.
166 The data can then be read downstream as if they were coming from a BS file.
167
168 Args:
169 flags: Job configuration flags
170 item_list: (optional) List of objects to be written to transient ByteStream
171 type_names: (optional) List of types/names to register in BS conversion service
172 as available to be read from (transient) ByteStream
173 extra_inputs: (optional) List of objects which need to be produced before transient
174 ByteStream streaming is scheduled - ensures correct scheduling
175
176 Returns:
177 A component accumulator fragment containing the components required to
178 write transient bytestream. Should be merged into main job configuration.
179 """
180
181 result = ComponentAccumulator()
182
183 result.addService(CompFactory.ROBDataProviderSvc())
184
185 rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
186 result.addService(rdp_output)
187
188 bytestream_conversion = CompFactory.ByteStreamCnvSvc(
189 name="ByteStreamCnvSvc",
190 FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
191 ByteStreamOutputSvcList=[rdp_output.getName()],
192 )
193 result.addService(bytestream_conversion)
194
195 # Special fictitious extra output which can be used to ensure correct
196 # scheduling of transient ByteStream clients
197 extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
198
199 # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
200 event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
201 if not extra_inputs:
202 extra_inputs = [event_info_input]
203 elif event_info_input not in extra_inputs:
204 extra_inputs.append(event_info_input)
205
206 output_stream = CompFactory.AthenaOutputStream(
207 name="TransBSStreamAlg",
208 EvtConversionSvc=bytestream_conversion.name,
209 OutputFile="ByteStreamRDP_OutputSvc",
210 ItemList=item_list if item_list else list(),
211 ExtraInputs=extra_inputs,
212 ExtraOutputs=extra_outputs
213 )
214 result.addEventAlgo(output_stream,
215 primary=True)
216
217 address_provider = CompFactory.ByteStreamAddressProviderSvc(
218 TypeNames=type_names if type_names else list(),
219 )
220 result.addService(address_provider)
221
222 result.addService(CompFactory.ProxyProviderSvc(
223 ProviderNames = [address_provider.name]))
224
225 result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
226
227 return result
228
229

Variable Documentation

◆ acc

ByteStreamConfig.acc = MainServicesCfg(flags)

◆ doWriteBS

ByteStreamConfig.doWriteBS

◆ Files

ByteStreamConfig.Files

◆ flags

ByteStreamConfig.flags = initConfigFlags()

◆ log

ByteStreamConfig.log = logging.getLogger('ByteStreamConfig')

◆ read

ByteStreamConfig.read = ByteStreamReadCfg(flags)

◆ write

ByteStreamConfig.write = ByteStreamWriteCfg(flags)