ATLAS Offline Software
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 """Set up to read and/or write bytestream files.
4 
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
9 
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
12 file.
13 
14  Typical usage examples:
15 
16  component_accumulator.merge(byteStreamReadCfg(flags))
17 """
18 from AthenaConfiguration.ComponentFactory import CompFactory
19 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
23 
24 
25 def ByteStreamReadCfg(flags, type_names=None):
26  """Set up to read from a bytestream file
27 
28  The function adds the components required to read events and metadata from
29  bytestream input. May be used to read events from a secondary input as well
30  primary input file.
31 
32  Args:
33  flags: Job configuration flags
34  type_names: (optional) specific type names for address provider to find
35 
36  Returns:
37  A component accumulator fragment containing the components required to
38  read from bytestream. Should be merged into main job configuration.
39  """
40  result = ComponentAccumulator()
41 
42  bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43  result.addService(bytestream_conversion)
44 
45  eiName = "EventInfo"
46  if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
47  bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
48  else:
49  eiName = "{}EventInfo".format(flags.Overlay.BkgPrefix if flags.Overlay.ByteStream else "")
50  bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
51  name="ByteStreamInputSvc",
52  EventInfoKey=eiName)
53  result.addService(bytestream_input)
54 
55  if flags.Input.SecondaryFiles:
56  event_selector = CompFactory.EventSelectorByteStream(
57  name="SecondaryEventSelector",
58  IsSecondary=True,
59  Input=flags.Input.SecondaryFiles,
60  SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
61  ByteStreamInputSvc=bytestream_input.name,
62  )
63  result.addService(event_selector)
64  else:
65  event_selector = CompFactory.EventSelectorByteStream(
66  name="EventSelector",
67  Input=flags.Input.Files,
68  SkipEvents=flags.Exec.SkipEvents,
69  ByteStreamInputSvc=bytestream_input.name,
70  )
71  result.addService(event_selector)
72  result.setAppProperty("EvtSel", event_selector.name)
73 
74  event_persistency = CompFactory.EvtPersistencySvc(
75  name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
76  )
77  result.addService(event_persistency)
78 
79  result.addService(CompFactory.ROBDataProviderSvc())
80 
81  address_provider = CompFactory.ByteStreamAddressProviderSvc(
82  TypeNames=type_names if type_names else list(),
83  )
84  result.addService(address_provider)
85 
86  result.merge(
87  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
88  )
89 
90  proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
91  result.addService(proxy)
92 
93  result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames,
94  ExtraOutputs=[("xAOD::EventInfo",f"StoreGateSvc+{eiName}"),
95  ("xAOD::EventAuxInfo",f"StoreGateSvc+{eiName}Aux.")]))
96 
97  return result
98 
99 
100 def ByteStreamWriteCfg(flags, type_names=None):
101  """Set up output stream in RAW/bytestream format
102 
103  Configure components responsible for writing bytestream format. Write job
104  results to bytestream file. ATLAS file naming conventions are enforced as
105  determined from the given configuration flags.
106 
107  Args:
108  flags: Job configuration flags
109  type_names: (optional) Specify item list for output stream to write
110 
111  Returns:
112  A component accumulator fragment containing the components required to
113  write to bytestream. Should be merged into main job configuration.
114  """
115  all_runs = set(flags.Input.RunNumbers)
116  assert (
117  len(all_runs) == 1
118  ), "Input is from multiple runs, do not know which one to use {}".format(
119  all_runs
120  )
121  result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
122 
123  event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
124  MaxFileMB=15000,
125  MaxFileNE=15000000, # event (beyond which it creates a new file)
126  OutputDirectory="./",
127  SimpleFileName=flags.Output.BSFileName,
128  AppName="Athena",
129  RunNumber=all_runs.pop(),
130  )
131  result.addService(event_storage_output)
132  # release variable depends the way the env is configured
133  # FileTag = release
134 
135  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
136  name="ByteStreamCnvSvc",
137  ByteStreamOutputSvcList=[event_storage_output.getName()],
138  )
139  result.addService(bytestream_conversion)
140 
141  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
142  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
143 
144  output_stream = CompFactory.AthenaOutputStream(
145  name="BSOutputStreamAlg",
146  EvtConversionSvc=bytestream_conversion.name,
147  OutputFile="ByteStreamEventStorageOutputSvc",
148  ItemList=type_names if type_names else list(),
149  ExtraInputs=[event_info_input]
150  )
151  result.addEventAlgo(output_stream, primary=True)
152 
153  result.merge(IOVDbSvcCfg(flags))
154 
155  result.merge(
156  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
157  )
158 
159  return result
160 
161 def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
162  """Set up transient ByteStream output stream
163 
164  Configure components responsible for writing bytestream format. Write the
165  specified objects to ByteStream into the cache of the ROBDataProviderSvc.
166  The data can then be read downstream as if they were coming from a BS file.
167 
168  Args:
169  flags: Job configuration flags
170  item_list: (optional) List of objects to be written to transient ByteStream
171  type_names: (optional) List of types/names to register in BS conversion service
172  as available to be read from (transient) ByteStream
173  extra_inputs: (optional) List of objects which need to be produced before transient
174  ByteStream streaming is scheduled - ensures correct scheduling
175 
176  Returns:
177  A component accumulator fragment containing the components required to
178  write transient bytestream. Should be merged into main job configuration.
179  """
180 
181  result = ComponentAccumulator()
182 
183  result.addService(CompFactory.ROBDataProviderSvc())
184 
185  rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
186  result.addService(rdp_output)
187 
188  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
189  name="ByteStreamCnvSvc",
190  FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
191  ByteStreamOutputSvcList=[rdp_output.getName()],
192  )
193  result.addService(bytestream_conversion)
194 
195  # Special fictitious extra output which can be used to ensure correct
196  # scheduling of transient ByteStream clients
197  extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
198 
199  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
200  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
201  if not extra_inputs:
202  extra_inputs = [event_info_input]
203  elif event_info_input not in extra_inputs:
204  extra_inputs.append(event_info_input)
205 
206  output_stream = CompFactory.AthenaOutputStream(
207  name="TransBSStreamAlg",
208  EvtConversionSvc=bytestream_conversion.name,
209  OutputFile="ByteStreamRDP_OutputSvc",
210  ItemList=item_list if item_list else list(),
211  ExtraInputs=extra_inputs,
212  ExtraOutputs=extra_outputs
213  )
214  result.addEventAlgo(output_stream,
215  primary=True)
216 
217  address_provider = CompFactory.ByteStreamAddressProviderSvc(
218  TypeNames=type_names if type_names else list(),
219  )
220  result.addService(address_provider)
221 
222  result.addService(CompFactory.ProxyProviderSvc(
223  ProviderNames = [address_provider.name]))
224 
225  result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
226 
227  return result
228 
229 
230 if __name__ == "__main__":
231  """Run a functional test if module is executed"""
232 
233  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
234  from AthenaConfiguration.TestDefaults import defaultTestFiles
235  from AthenaConfiguration.AllConfigFlags import initConfigFlags
236  from AthenaCommon.Logging import logging
237 
238  log = logging.getLogger('ByteStreamConfig')
239 
240  flags = initConfigFlags()
241  flags.Input.Files = defaultTestFiles.RAW_RUN2
242  flags.Output.doWriteBS = True
243  flags.lock()
244 
245  read = ByteStreamReadCfg(flags)
246  read.store(open("test.pkl", "wb"))
247  print("All OK")
248 
249  write = ByteStreamWriteCfg(flags)
250  write.printConfig()
251  log.info("Write setup OK")
252 
253  acc = MainServicesCfg(flags)
254  acc.merge(read)
255  acc.merge(write)
256  acc.printConfig()
257  log.info("Config OK")
258 
259  with open('ByteStreamConfig.pkl', 'wb') as pkl:
260  acc.store(pkl)
261 
262  import sys
263  sys.exit(acc.run(10).isFailure())
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:342
vtune_athena.format
format
Definition: vtune_athena.py:14
python.ByteStreamConfig.TransientByteStreamCfg
def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:161
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.ByteStreamConfig.ByteStreamWriteCfg
def ByteStreamWriteCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:100
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:310
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.IOVDbSvcConfig.IOVDbSvcCfg
def IOVDbSvcCfg(flags, **kwargs)
Definition: IOVDbSvcConfig.py:28
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
Trk::open
@ open
Definition: BinningType.h:40
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19