ATLAS Offline Software
Event/ByteStreamCnvSvc/python/ByteStreamConfig.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3 """Set up to read and/or write bytestream files.
4 
5 This module configures the Athena components required to read from
6 RAW/bytestream input. Use either byteStreamReadCfg to set up for reading
7 and byteStreamWriteCfg to set up for writing. Merge the component accumulator
8 the functions return into your job configuration as needed.
9 
10 Executing this module will run a short test over 10 events. The events are read
11 from the default bytestream input on CVMFS and written to a local bytestream
12 file.
13 
14  Typical usage examples:
15 
16  component_accumulator.merge(byteStreamReadCfg(flags))
17 """
18 from AthenaConfiguration.ComponentFactory import CompFactory
19 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
20 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
21 from IOVDbSvc.IOVDbSvcConfig import IOVDbSvcCfg
22 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
23 
24 
25 def ByteStreamReadCfg(flags, type_names=None):
26  """Set up to read from a bytestream file
27 
28  The function adds the components required to read events and metadata from
29  bytestream input. May be used to read events from a secondary input as well
30  primary input file.
31 
32  Args:
33  flags: Job configuration flags
34  type_names: (optional) specific type names for address provider to find
35 
36  Returns:
37  A component accumulator fragment containing the components required to
38  read from bytestream. Should be merged into main job configuration.
39  """
40  result = ComponentAccumulator()
41 
42  bytestream_conversion = CompFactory.ByteStreamCnvSvc()
43  result.addService(bytestream_conversion)
44 
45  if flags.Common.isOnline and not any(flags.Input.Files) and not (flags.Trigger.doHLT or flags.Trigger.doLVL1):
46  bytestream_input = CompFactory.ByteStreamEmonInputSvc("ByteStreamInputSvc")
47  else:
48  bytestream_input = CompFactory.ByteStreamEventStorageInputSvc(
49  name="ByteStreamInputSvc",
50  EventInfoKey="{}EventInfo".format(
51  flags.Overlay.BkgPrefix if flags.Overlay.DataOverlay else ""
52  ),
53  )
54  result.addService(bytestream_input)
55 
56  if flags.Input.SecondaryFiles:
57  event_selector = CompFactory.EventSelectorByteStream(
58  name="SecondaryEventSelector",
59  IsSecondary=True,
60  Input=flags.Input.SecondaryFiles,
61  SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
62  ByteStreamInputSvc=bytestream_input.name,
63  )
64  result.addService(event_selector)
65  else:
66  event_selector = CompFactory.EventSelectorByteStream(
67  name="EventSelector",
68  Input=flags.Input.Files,
69  SkipEvents=flags.Exec.SkipEvents,
70  ByteStreamInputSvc=bytestream_input.name,
71  HelperTools = [CompFactory.xAODMaker.EventInfoSelectorTool()]
72  )
73  result.addService(event_selector)
74  result.setAppProperty("EvtSel", event_selector.name)
75 
76  event_persistency = CompFactory.EvtPersistencySvc(
77  name="EventPersistencySvc", CnvServices=[bytestream_conversion.name]
78  )
79  result.addService(event_persistency)
80 
81  result.addService(CompFactory.ROBDataProviderSvc())
82 
83  address_provider = CompFactory.ByteStreamAddressProviderSvc(
84  TypeNames=type_names if type_names else list(),
85  )
86  result.addService(address_provider)
87 
88  result.merge(
89  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
90  )
91 
92  proxy = CompFactory.ProxyProviderSvc(ProviderNames = [address_provider.name])
93  result.addService(proxy)
94 
95  result.merge(SGInputLoaderCfg(flags, address_provider.TypeNames))
96 
97  return result
98 
99 
100 def ByteStreamWriteCfg(flags, type_names=None):
101  """Set up output stream in RAW/bytestream format
102 
103  Configure components responsible for writing bytestream format. Write job
104  results to bytestream file. ATLAS file naming conventions are enforced as
105  determined from the given configuration flags.
106 
107  Args:
108  flags: Job configuration flags
109  type_names: (optional) Specify item list for output stream to write
110 
111  Returns:
112  A component accumulator fragment containing the components required to
113  write to bytestream. Should be merged into main job configuration.
114  """
115  all_runs = set(flags.Input.RunNumbers)
116  assert (
117  len(all_runs) == 1
118  ), "Input is from multiple runs, do not know which one to use {}".format(
119  all_runs
120  )
121  result = ComponentAccumulator(CompFactory.AthSequencer('AthOutSeq', StopOverride=True))
122 
123  event_storage_output = CompFactory.ByteStreamEventStorageOutputSvc(
124  MaxFileMB=15000,
125  MaxFileNE=15000000, # event (beyond which it creates a new file)
126  OutputDirectory="./",
127  AppName="Athena",
128  RunNumber=all_runs.pop(),
129  )
130  result.addService(event_storage_output)
131  # release variable depends the way the env is configured
132  # FileTag = release
133 
134  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
135  name="ByteStreamCnvSvc",
136  ByteStreamOutputSvcList=[event_storage_output.getName()],
137  )
138  result.addService(bytestream_conversion)
139 
140  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
141  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
142 
143  output_stream = CompFactory.AthenaOutputStream(
144  name="BSOutputStreamAlg",
145  EvtConversionSvc=bytestream_conversion.name,
146  OutputFile="ByteStreamEventStorageOutputSvc",
147  ItemList=type_names if type_names else list(),
148  ExtraInputs=[event_info_input]
149  )
150  result.addEventAlgo(output_stream, primary=True)
151 
152  result.merge(IOVDbSvcCfg(flags))
153 
154  result.merge(
155  MetaDataSvcCfg(flags, ["IOVDbMetaDataTool", "ByteStreamMetadataTool"])
156  )
157 
158  return result
159 
160 def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None):
161  """Set up transient ByteStream output stream
162 
163  Configure components responsible for writing bytestream format. Write the
164  specified objects to ByteStream into the cache of the ROBDataProviderSvc.
165  The data can then be read downstream as if they were coming from a BS file.
166 
167  Args:
168  flags: Job configuration flags
169  item_list: (optional) List of objects to be written to transient ByteStream
170  type_names: (optional) List of types/names to register in BS conversion service
171  as available to be read from (transient) ByteStream
172  extra_inputs: (optional) List of objects which need to be produced before transient
173  ByteStream streaming is scheduled - ensures correct scheduling
174 
175  Returns:
176  A component accumulator fragment containing the components required to
177  write transient bytestream. Should be merged into main job configuration.
178  """
179 
180  result = ComponentAccumulator()
181 
182  result.addService(CompFactory.ROBDataProviderSvc())
183 
184  rdp_output = CompFactory.ByteStreamRDP_OutputSvc()
185  result.addService(rdp_output)
186 
187  bytestream_conversion = CompFactory.ByteStreamCnvSvc(
188  name="ByteStreamCnvSvc",
189  FillTriggerBits=False, # ATR-25971, transient BS is produced before trigger bits in RDOtoRDOTrigger
190  ByteStreamOutputSvcList=[rdp_output.getName()],
191  )
192  result.addService(bytestream_conversion)
193 
194  # Special fictitious extra output which can be used to ensure correct
195  # scheduling of transient ByteStream clients
196  extra_outputs = [("TransientBSOutType","StoreGateSvc+TransientBSOutKey")]
197 
198  # ByteStreamCnvSvc::connectOutput() requires xAOD::EventInfo
199  event_info_input = ('xAOD::EventInfo','StoreGateSvc+EventInfo')
200  if not extra_inputs:
201  extra_inputs = [event_info_input]
202  elif event_info_input not in extra_inputs:
203  extra_inputs.append(event_info_input)
204 
205  output_stream = CompFactory.AthenaOutputStream(
206  name="TransBSStreamAlg",
207  EvtConversionSvc=bytestream_conversion.name,
208  OutputFile="ByteStreamRDP_OutputSvc",
209  ItemList=item_list if item_list else list(),
210  ExtraInputs=extra_inputs,
211  ExtraOutputs=extra_outputs
212  )
213  result.addEventAlgo(output_stream,
214  primary=True)
215 
216  address_provider = CompFactory.ByteStreamAddressProviderSvc(
217  TypeNames=type_names if type_names else list(),
218  )
219  result.addService(address_provider)
220 
221  result.addService(CompFactory.ProxyProviderSvc(
222  ProviderNames = [address_provider.name]))
223 
224  result.merge(SGInputLoaderCfg(flags, Load=address_provider.TypeNames))
225 
226  return result
227 
228 
229 if __name__ == "__main__":
230  """Run a functional test if module is executed"""
231 
232  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
233  from AthenaConfiguration.TestDefaults import defaultTestFiles
234  from AthenaConfiguration.AllConfigFlags import initConfigFlags
235  from AthenaCommon.Logging import logging
236 
237  log = logging.getLogger('ByteStreamConfig')
238 
239  flags = initConfigFlags()
240  flags.Input.Files = defaultTestFiles.RAW_RUN2
241  flags.Output.doWriteBS = True
242  flags.lock()
243 
244  read = ByteStreamReadCfg(flags)
245  read.store(open("test.pkl", "wb"))
246  print("All OK")
247 
248  write = ByteStreamWriteCfg(flags)
249  write.printConfig()
250  log.info("Write setup OK")
251 
252  acc = MainServicesCfg(flags)
253  acc.merge(read)
254  acc.merge(write)
255  acc.printConfig()
256  log.info("Config OK")
257 
258  with open('ByteStreamConfig.pkl', 'wb') as pkl:
259  acc.store(pkl)
260 
261  import sys
262  sys.exit(acc.run(10).isFailure())
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.ByteStreamConfig.TransientByteStreamCfg
def TransientByteStreamCfg(flags, item_list=None, type_names=None, extra_inputs=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:160
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.ByteStreamConfig.ByteStreamWriteCfg
def ByteStreamWriteCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:100
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.IOVDbSvcConfig.IOVDbSvcCfg
def IOVDbSvcCfg(flags, **kwargs)
Definition: IOVDbSvcConfig.py:19
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
Trk::open
@ open
Definition: BinningType.h:40
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70