ATLAS Offline Software
OutputStreamConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory import CompFactory
5 from AthenaConfiguration.Enums import ProductionStep
6 from AthenaCommon.Logging import logging
7 
8 
9 def outputStreamName(streamName):
10  return f"Stream{streamName}"
11 
12 
13 # keepProvenanceTagsRegEx - RexEx string to match processing tags in the Event provenance. Only matching tags will be copied
14 # to the new DataHeader. Empty string rejects all tags. Direct provenance is not affected
15 # (see extendProvenanceRecord)
16 def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
17  disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
18  extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]):
19  eventInfoKey = "EventInfo"
20  if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
21  eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
22 
23  msg = logging.getLogger("OutputStreamCfg")
24  flagName = f"Output.{streamName}FileName"
25  if flags.hasFlag(flagName):
26  fileName = flags._get(flagName)
27  else:
28  fileName = f"my{streamName}.pool.root"
29  msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
30 
31  if fileName in flags.Input.Files:
32  raise ConfigurationError("Same name for input and output file %s" % fileName)
33 
34  result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
35 
36  # Set up AthenaPoolCnvSvc through PoolWriteCfg
37  from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
38  result.merge(PoolWriteCfg(flags))
39 
40  # define athena output stream
41  writingTool = CompFactory.AthenaOutputStreamTool(
42  f"{outputStreamName(streamName)}Tool",
43  DataHeaderKey=outputStreamName(streamName),
44  MetaDataPoolContainerPrefix=f"{flags.Output.StorageTechnology.MetaData}:MetaData",
45  MetaDataOutputCollection=f"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
46  )
47 
48  # If we're running in augmentation mode, configure the writing tool accordingly
49  parentStream = f"Output.{streamName}ParentStream"
50  childStream = f"Output.{streamName}ChildStream"
51  if flags.hasFlag(childStream):
52  writingTool.SaveDecisions = True
53  elif flags.hasFlag(parentStream):
54  disableEventTag = True
55  writingTool.OutputCollection = f"POOLContainer_{streamName}"
56  writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
57  writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
58  writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
59  msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
60 
61  # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
62  # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
63  finalItemList = []
64  if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
65  finalItemList = ItemList
66  else:
67  finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
68 
69  outputStream = CompFactory.AthenaOutputStream(
70  outputStreamName(streamName),
71  StreamName=outputStreamName(streamName),
72  WritingTool=writingTool,
73  ItemList=finalItemList,
74  MetadataItemList=MetadataItemList,
75  OutputFile=fileName,
76  HelperTools=HelperTools,
77  )
78  if takeItemsFromInput:
79  # Avoid explicitly setting the property unless we want to set it
80  # to True (default in C++ is False). This avoids CA merge
81  # conflicts.
82  outputStream.TakeItemsFromInput = True
83  if not extendProvenanceRecord:
84  # Treat this similar to takeItemsFromInput
85  # (C++ default in this case is True)
86  outputStream.ExtendProvenanceRecord = False
87  if keepProvenanceTagsRegEx is not None:
88  # C++ defaults to '.*' which means all. Overwrite only on request.
89  outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
90  outputStream.AcceptAlgs += AcceptAlgs
91  outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
92  if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
93  outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
94  # Ignore dependencies
95  from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
96  result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
97 
98  result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
99  outputStream.MetadataStore = result.getService("MetaDataStore")
100 
101  # Support for MT thinning.
102  thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
103  StreamName=outputStreamName(streamName))
104  if trigNavThinningSvc is not None:
105  thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
106  outputStream.HelperTools.append(thinningCacheTool)
107 
108  # Event Tag
109  if not disableEventTag:
110  key = "SimpleTag"
111  outputStream.WritingTool.AttributeListKey=key
112 
113  propagateInputAttributeList = False
114  if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
115  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
116  result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
117  propagateInputAttributeList = True
118 
119  # build eventinfo attribute list
120  tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
121  Tool=CompFactory.EventInfoAttListTool(),
122  EventInfoKey=eventInfoKey,
123  PropagateInput=propagateInputAttributeList)
124  result.addEventAlgo(tagBuilder)
125 
126  # For xAOD output
127  if "AOD" in streamName:
128  outputStream.WritingTool.SubLevelBranchName = "<key>"
129 
130  result.addEventAlgo(outputStream, domain='IO')
131  return result
132 
133 
134 def addToESD(flags, itemOrList, **kwargs):
135  """
136  Adds items to ESD stream
137 
138  The argument can be either list of items or just one item
139  if further customisations are needed for output stream they can be passed via kwargs
140 
141  returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
142  """
143  if not flags.Output.doWriteESD:
144  return ComponentAccumulator()
145  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
146  return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
147 
148 
149 def addToAOD(flags, itemOrList, **kwargs):
150  """
151  Adds items to AOD stream
152 
153  @see add addToESD
154  """
155  if not flags.Output.doWriteAOD:
156  return ComponentAccumulator()
157  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
158  return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
159 
160 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
161  """
162  Adds Metadata items to the stream named streamName
163 
164  Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
165  The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
166  The former is needed when there are special kernels, e.g., simulation/derivation
167  The latter is needed primarily for the propagation of the FileMetaData tool
168 
169  Returns CA to be merged
170  """
171  flagName = f"Output.doWrite{streamName}"
172  if not flags.hasFlag(flagName):
173  return ComponentAccumulator()
174  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
175  return OutputStreamCfg(flags, streamName, MetadataItemList=items,
176  AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)
python.OutputStreamConfig.addToMetaData
def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
Definition: OutputStreamConfig.py:160
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.OutputStreamConfig.outputStreamName
def outputStreamName(streamName)
Definition: OutputStreamConfig.py:9
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:57
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:16
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.OutputStreamConfig.addToESD
def addToESD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:134
python.OutputStreamConfig.addToAOD
def addToAOD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:149