ATLAS Offline Software
OutputStreamConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory import CompFactory
5 from AthenaConfiguration.Enums import ProductionStep
6 from AthenaCommon.Logging import logging
7 
8 
9 def outputStreamName(streamName):
10  return f"Stream{streamName}"
11 
12 
13 # keepProvenanceTagsRegEx - RexEx string to match processing tags in the Event provenance. Only matching tags will be copied
14 # to the new DataHeader. Empty string rejects all tags. Direct provenance is not affected
15 # (see extendProvenanceRecord)
16 def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
17  disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
18  extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]):
19  eventInfoKey = "EventInfo"
20  if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
21  eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
22 
23  msg = logging.getLogger("OutputStreamCfg")
24  flagName = f"Output.{streamName}FileName"
25  if flags.hasFlag(flagName):
26  fileName = flags._get(flagName)
27  else:
28  fileName = f"my{streamName}.pool.root"
29  msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
30 
31  if fileName in flags.Input.Files:
32  raise ConfigurationError("Same name for input and output file %s" % fileName)
33 
34  result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
35 
36  # Set up AthenaPoolCnvSvc through PoolWriteCfg
37  from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
38  result.merge(PoolWriteCfg(flags))
39 
40  # define athena output stream
41  writingTool = CompFactory.AthenaOutputStreamTool(
42  f"{outputStreamName(streamName)}Tool",
43  DataHeaderKey=outputStreamName(streamName),
44  MetaDataPoolContainerPrefix=f"{flags.Output.StorageTechnology.MetaData}:MetaData",
45  MetaDataOutputCollection=f"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
46  ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
47  )
48 
49  # If we're running in augmentation mode, configure the writing tool accordingly
50  parentStream = f"Output.{streamName}ParentStream"
51  childStream = f"Output.{streamName}ChildStream"
52  if flags.hasFlag(childStream):
53  writingTool.SaveDecisions = True
54  elif flags.hasFlag(parentStream):
55  disableEventTag = True
56  writingTool.OutputCollection = f"POOLContainer_{streamName}"
57  writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
58  writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
59  writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
60  msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
61 
62  # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
63  # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
64  finalItemList = []
65  if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
66  finalItemList = ItemList
67  else:
68  finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
69 
70  outputStream = CompFactory.AthenaOutputStream(
71  outputStreamName(streamName),
72  StreamName=outputStreamName(streamName),
73  WritingTool=writingTool,
74  ItemList=finalItemList,
75  MetadataItemList=MetadataItemList,
76  OutputFile=fileName,
77  HelperTools=HelperTools,
78  )
79  if takeItemsFromInput:
80  # Avoid explicitly setting the property unless we want to set it
81  # to True (default in C++ is False). This avoids CA merge
82  # conflicts.
83  outputStream.TakeItemsFromInput = True
84  if not extendProvenanceRecord:
85  # Treat this similar to takeItemsFromInput
86  # (C++ default in this case is True)
87  outputStream.ExtendProvenanceRecord = False
88  if keepProvenanceTagsRegEx is not None:
89  # C++ defaults to '.*' which means all. Overwrite only on request.
90  outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
91  outputStream.AcceptAlgs += AcceptAlgs
92  outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
93  if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
94  outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
95  # Ignore dependencies
96  from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
97  result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
98 
99  result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
100  outputStream.MetadataStore = result.getService("MetaDataStore")
101 
102  # Support for MT thinning.
103  thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
104  StreamName=outputStreamName(streamName))
105  if trigNavThinningSvc is not None:
106  thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
107  outputStream.HelperTools.append(thinningCacheTool)
108 
109  # Event Tag
110  if not disableEventTag:
111  key = "SimpleTag"
112  outputStream.WritingTool.AttributeListKey=key
113 
114  propagateInputAttributeList = False
115  if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
116  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
117  result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
118  propagateInputAttributeList = True
119 
120  # build eventinfo attribute list
121  tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
122  Tool=CompFactory.EventInfoAttListTool(),
123  EventInfoKey=eventInfoKey,
124  PropagateInput=propagateInputAttributeList)
125  result.addEventAlgo(tagBuilder)
126 
127  # For xAOD output
128  if "AOD" in streamName:
129  outputStream.WritingTool.SubLevelBranchName = "<key>"
130 
131  result.addEventAlgo(outputStream, domain='IO')
132  return result
133 
134 
135 def addToESD(flags, itemOrList, **kwargs):
136  """
137  Adds items to ESD stream
138 
139  The argument can be either list of items or just one item
140  if further customisations are needed for output stream they can be passed via kwargs
141 
142  returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
143  """
144  if not flags.Output.doWriteESD:
145  return ComponentAccumulator()
146  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
147  return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
148 
149 
150 def addToAOD(flags, itemOrList, **kwargs):
151  """
152  Adds items to AOD stream
153 
154  @see add addToESD
155  """
156  if not flags.Output.doWriteAOD:
157  return ComponentAccumulator()
158  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
159  return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
160 
161 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
162  """
163  Adds Metadata items to the stream named streamName
164 
165  Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
166  The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
167  The former is needed when there are special kernels, e.g., simulation/derivation
168  The latter is needed primarily for the propagation of the FileMetaData tool
169 
170  Returns CA to be merged
171  """
172  flagName = f"Output.doWrite{streamName}"
173  if not flags.hasFlag(flagName):
174  return ComponentAccumulator()
175  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
176  return OutputStreamCfg(flags, streamName, MetadataItemList=items,
177  AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)
python.OutputStreamConfig.addToMetaData
def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
Definition: OutputStreamConfig.py:161
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.OutputStreamConfig.outputStreamName
def outputStreamName(streamName)
Definition: OutputStreamConfig.py:9
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:59
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:16
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.OutputStreamConfig.addToESD
def addToESD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:135
python.OutputStreamConfig.addToAOD
def addToAOD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:150