ATLAS Offline Software
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 """Set up to read and/or write bytestream files.
4 
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
9 
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
12 file.
13 
14  Typical usage examples:
15 
16  component_accumulator.merge(byteStreamReadCfg(flags))
17 """
18 from AthenaConfiguration.ComponentFactory import CompFactory
19 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
23 
24 
25 def ByteStreamReadCfg(flags, type_names=None):
26  """Set up to read from a bytestream file
27 
28  The function adds the components required to read events and metadata from
29  bytestream input. May be used to read events from a secondary input as well
30  primary input file.
31 
32  Args:
33  flags: Job configuration flags
34  type_names: (optional) specific type names for address provider to find
35 
36  Returns:
37  A component accumulator fragment containing the components required to
38  read from bytestream. Should be merged into main job configuration.
39  """
40  result = ComponentAccumulator()
41 
42  bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43  result.addService(bytestream_conversion)
44 
45  if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
46  bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
47  else:
48  bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
49  name="ByteStreamInputSvc",
50  EventInfoKey="{}EventInfo".format(
51  flags.Overlay.BkgPrefix if flags.Overlay.ByteStream else ""
52  ),
53  )
54  result.addService(bytestream_input)
55 
56  if flags.Input.SecondaryFiles:
57  event_selector = CompFactory.EventSelectorByteStream(
58  name="SecondaryEventSelector",
59  IsSecondary=True,
60  Input=flags.Input.SecondaryFiles,
61  SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
62  ByteStreamInputSvc=bytestream_input.name,
63  )
64  result.addService(event_selector)
65  else:
66  event_selector = CompFactory.EventSelectorByteStream(
67  name="EventSelector",
68  Input=flags.Input.Files,
69  SkipEvents=flags.Exec.SkipEvents,
70  ByteStreamInputSvc=bytestream_input.name,
71  )
72  result.addService(event_selector)
73  result.setAppProperty("EvtSel", event_selector.name)
74 
75  event_persistency = CompFactory.EvtPersistencySvc(
76  name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
77  )
78  result.addService(event_persistency)
79 
80  result.addService(CompFactory.ROBDataProviderSvc())
81 
82  address_provider = CompFactory.ByteStreamAddressProviderSvc(
83  TypeNames=type_names if type_names else list(),
84  )
85  result.addService(address_provider)
86 
87  result.merge(
88  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
89  )
90 
91  proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
92  result.addService(proxy)
93 
94  result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames))
95 
96  return result
97 
98 
99 def ByteStreamWriteCfg(flags, type_names=None):
100  """Set up output stream in RAW/bytestream format
101 
102  Configure components responsible for writing bytestream format. Write job
103  results to bytestream file. ATLAS file naming conventions are enforced as
104  determined from the given configuration flags.
105 
106  Args:
107  flags: Job configuration flags
108  type_names: (optional) Specify item list for output stream to write
109 
110  Returns:
111  A component accumulator fragment containing the components required to
112  write to bytestream. Should be merged into main job configuration.
113  """
114  all_runs = set(flags.Input.RunNumbers)
115  assert (
116  len(all_runs) == 1
117  ), "Input is from multiple runs, do not know which one to use {}".format(
118  all_runs
119  )
120  result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
121 
122  event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
123  MaxFileMB=15000,
124  MaxFileNE=15000000, # event (beyond which it creates a new file)
125  OutputDirectory="./",
126  AppName="Athena",
127  RunNumber=all_runs.pop(),
128  )
129  result.addService(event_storage_output)
130  # release variable depends the way the env is configured
131  # FileTag = release
132 
133  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
134  name="ByteStreamCnvSvc",
135  ByteStreamOutputSvcList=[event_storage_output.getName()],
136  )
137  result.addService(bytestream_conversion)
138 
139  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
140  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
141 
142  output_stream = CompFactory.AthenaOutputStream(
143  name="BSOutputStreamAlg",
144  EvtConversionSvc=bytestream_conversion.name,
145  OutputFile="ByteStreamEventStorageOutputSvc",
146  ItemList=type_names if type_names else list(),
147  ExtraInputs=[event_info_input]
148  )
149  result.addEventAlgo(output_stream, primary=True)
150 
151  result.merge(IOVDbSvcCfg(flags))
152 
153  result.merge(
154  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
155  )
156 
157  return result
158 
159 def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
160  """Set up transient ByteStream output stream
161 
162  Configure components responsible for writing bytestream format. Write the
163  specified objects to ByteStream into the cache of the ROBDataProviderSvc.
164  The data can then be read downstream as if they were coming from a BS file.
165 
166  Args:
167  flags: Job configuration flags
168  item_list: (optional) List of objects to be written to transient ByteStream
169  type_names: (optional) List of types/names to register in BS conversion service
170  as available to be read from (transient) ByteStream
171  extra_inputs: (optional) List of objects which need to be produced before transient
172  ByteStream streaming is scheduled - ensures correct scheduling
173 
174  Returns:
175  A component accumulator fragment containing the components required to
176  write transient bytestream. Should be merged into main job configuration.
177  """
178 
179  result = ComponentAccumulator()
180 
181  result.addService(CompFactory.ROBDataProviderSvc())
182 
183  rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
184  result.addService(rdp_output)
185 
186  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
187  name="ByteStreamCnvSvc",
188  FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
189  ByteStreamOutputSvcList=[rdp_output.getName()],
190  )
191  result.addService(bytestream_conversion)
192 
193  # Special fictitious extra output which can be used to ensure correct
194  # scheduling of transient ByteStream clients
195  extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
196 
197  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
198  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
199  if not extra_inputs:
200  extra_inputs = [event_info_input]
201  elif event_info_input not in extra_inputs:
202  extra_inputs.append(event_info_input)
203 
204  output_stream = CompFactory.AthenaOutputStream(
205  name="TransBSStreamAlg",
206  EvtConversionSvc=bytestream_conversion.name,
207  OutputFile="ByteStreamRDP_OutputSvc",
208  ItemList=item_list if item_list else list(),
209  ExtraInputs=extra_inputs,
210  ExtraOutputs=extra_outputs
211  )
212  result.addEventAlgo(output_stream,
213  primary=True)
214 
215  address_provider = CompFactory.ByteStreamAddressProviderSvc(
216  TypeNames=type_names if type_names else list(),
217  )
218  result.addService(address_provider)
219 
220  result.addService(CompFactory.ProxyProviderSvc(
221  ProviderNames = [address_provider.name]))
222 
223  result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
224 
225  return result
226 
227 
228 if __name__ == "__main__":
229  """Run a functional test if module is executed"""
230 
231  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
232  from AthenaConfiguration.TestDefaults import defaultTestFiles
233  from AthenaConfiguration.AllConfigFlags import initConfigFlags
234  from AthenaCommon.Logging import logging
235 
236  log = logging.getLogger('ByteStreamConfig')
237 
238  flags = initConfigFlags()
239  flags.Input.Files = defaultTestFiles.RAW_RUN2
240  flags.Output.doWriteBS = True
241  flags.lock()
242 
243  read = ByteStreamReadCfg(flags)
244  read.store(open("test.pkl", "wb"))
245  print("All OK")
246 
247  write = ByteStreamWriteCfg(flags)
248  write.printConfig()
249  log.info("Write setup OK")
250 
251  acc = MainServicesCfg(flags)
252  acc.merge(read)
253  acc.merge(write)
254  acc.printConfig()
255  log.info("Config OK")
256 
257  with open('ByteStreamConfig.pkl', 'wb') as pkl:
258  acc.store(pkl)
259 
260  import sys
261  sys.exit(acc.run(10).isFailure())
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.ByteStreamConfig.TransientByteStreamCfg
def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:159
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.ByteStreamConfig.ByteStreamWriteCfg
def ByteStreamWriteCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:99
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.IOVDbSvcConfig.IOVDbSvcCfg
def IOVDbSvcCfg(flags, **kwargs)
Definition: IOVDbSvcConfig.py:19
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
Trk::open
@ open
Definition: BinningType.h:40
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19