ATLAS Offline Software
OutputStreamConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2 
3 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory import CompFactory
5 from AthenaConfiguration.Enums import ProductionStep
6 from AthenaCommon.Logging import logging
7 
8 
9 def outputStreamName(streamName):
10  return f"Stream{streamName}"
11 
12 def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
13  disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
14  extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[]):
15  eventInfoKey = "EventInfo"
16  if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking]:
17  eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
18 
19  msg = logging.getLogger("OutputStreamCfg")
20  flagName = f"Output.{streamName}FileName"
21  if flags.hasFlag(flagName):
22  fileName = flags._get(flagName)
23  else:
24  fileName = f"my{streamName}.pool.root"
25  msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
26 
27  if fileName in flags.Input.Files:
28  raise ConfigurationError("Same name for input and output file %s" % fileName)
29 
30  result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
31 
32  # Set up AthenaPoolCnvSvc through PoolWriteCfg
33  from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
34  result.merge(PoolWriteCfg(flags))
35 
36  # define athena output stream
37  writingTool = CompFactory.AthenaOutputStreamTool(
38  f"{outputStreamName(streamName)}Tool",
39  DataHeaderKey=outputStreamName(streamName),
40  MetaDataPoolContainerPrefix=f"{flags.Output.StorageTechnology.MetaData}:MetaData",
41  MetaDataOutputCollection=f"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
42  )
43 
44  # If we're running in augmentation mode, configure the writing tool accordingly
45  parentStream = f"Output.{streamName}ParentStream"
46  childStream = f"Output.{streamName}ChildStream"
47  if flags.hasFlag(childStream):
48  writingTool.SaveDecisions = True
49  elif flags.hasFlag(parentStream):
50  disableEventTag = True
51  writingTool.OutputCollection = f"POOLContainer_{streamName}"
52  writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
53  writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
54  writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
55  msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
56 
57  # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
58  # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
59  finalItemList = []
60  if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
61  finalItemList = ItemList
62  else:
63  finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
64 
65  outputStream = CompFactory.AthenaOutputStream(
66  outputStreamName(streamName),
67  StreamName=outputStreamName(streamName),
68  WritingTool=writingTool,
69  ItemList=finalItemList,
70  MetadataItemList=MetadataItemList,
71  OutputFile=fileName,
72  HelperTools=HelperTools,
73  )
74  if takeItemsFromInput:
75  # Avoid explicitly setting the property unless we want to set it
76  # to True (default in C++ is False). This avoids CA merge
77  # conflicts.
78  outputStream.TakeItemsFromInput = True
79  if not extendProvenanceRecord:
80  # Treat this similar to takeItemsFromInput
81  # (C++ default in this case is True)
82  outputStream.ExtendProvenanceRecord = False
83  outputStream.AcceptAlgs += AcceptAlgs
84  outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
85  if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
86  outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
87  # Ignore dependencies
88  from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
89  result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
90 
91  result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
92  outputStream.MetadataStore = result.getService("MetaDataStore")
93 
94  # Support for MT thinning.
95  thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
96  StreamName=outputStreamName(streamName))
97  if trigNavThinningSvc is not None:
98  thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
99  outputStream.HelperTools.append(thinningCacheTool)
100 
101  # Event Tag
102  if not disableEventTag:
103  key = "SimpleTag"
104  outputStream.WritingTool.AttributeListKey=key
105 
106  propagateInputAttributeList = False
107  if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
108  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
109  result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
110  propagateInputAttributeList = True
111 
112  # build eventinfo attribute list
113  tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
114  Tool=CompFactory.EventInfoAttListTool(),
115  EventInfoKey=eventInfoKey,
116  PropagateInput=propagateInputAttributeList)
117  result.addEventAlgo(tagBuilder)
118 
119  # For xAOD output
120  if "AOD" in streamName:
121  outputStream.WritingTool.SubLevelBranchName = "<key>"
122 
123  result.addEventAlgo(outputStream, domain='IO')
124  return result
125 
126 
127 def addToESD(flags, itemOrList, **kwargs):
128  """
129  Adds items to ESD stream
130 
131  The argument can be either list of items or just one item
132  if further customisations are needed for output stream they can be passed via kwargs
133 
134  returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
135  """
136  if not flags.Output.doWriteESD:
137  return ComponentAccumulator()
138  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
139  return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
140 
141 
142 def addToAOD(flags, itemOrList, **kwargs):
143  """
144  Adds items to AOD stream
145 
146  @see add addToESD
147  """
148  if not flags.Output.doWriteAOD:
149  return ComponentAccumulator()
150  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
151  return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
152 
153 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
154  """
155  Adds Metadata items to the stream named streamName
156 
157  Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
158  The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
159  The former is needed when there are special kernels, e.g., simulation/derivation
160  The latter is needed primarily for the propagation of the FileMetaData tool
161 
162  Returns CA to be merged
163  """
164  flagName = f"Output.doWrite{streamName}"
165  if not flags.hasFlag(flagName):
166  return ComponentAccumulator()
167  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
168  return OutputStreamCfg(flags, streamName, MetadataItemList=items,
169  AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)
python.OutputStreamConfig.addToMetaData
def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
Definition: OutputStreamConfig.py:153
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.OutputStreamConfig.outputStreamName
def outputStreamName(streamName)
Definition: OutputStreamConfig.py:9
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:12
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:53
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.OutputStreamConfig.addToESD
def addToESD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:127
python.OutputStreamConfig.addToAOD
def addToAOD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:142