ATLAS Offline Software
Loading...
Searching...
No Matches
OutputStreamConfig.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
3from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError
4from AthenaConfiguration.ComponentFactory import CompFactory
5from AthenaConfiguration.Enums import ProductionStep
6from AthenaCommon.Logging import logging
7
8def outputStreamName(streamName):
9 return f"Stream{streamName}"
10
11
13 flags,
14 streamName,
15 ItemList=None,
16 MetadataItemList=None,
17 disableEventTag=False,
18 trigNavThinningSvc=None,
19 takeItemsFromInput=False,
20 extendProvenanceRecord=True,
21 keepProvenanceTagsRegEx=None,
22 AcceptAlgs=None,
23 HelperTools=None,
24):
25 """Configure an output stream for writing data to POOL files.
26
27 Args:
28 flags: Configuration flags object
29 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
30 ItemList: List of data objects to write to the stream
31 MetadataItemList: List of metadata objects to write
32 disableEventTag: If True, disable event tagging
33 trigNavThinningSvc: Trigger navigation thinning service
34 takeItemsFromInput: If True, take items from input file
35 extendProvenanceRecord: If True, extend provenance record with processing tags
36 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
37 Only matching tags will be copied to the new DataHeader.
38 Empty string rejects all tags. Direct provenance is not affected
39 (see extendProvenanceRecord).
40 AcceptAlgs: List of algorithms that must accept the event for it to be written
41 HelperTools: List of helper tools to attach to the stream
42
43 Returns:
44 ComponentAccumulator: Configured output stream and associated services
45 """
46 # Handle mutable default arguments
47 if ItemList is None:
48 ItemList = []
49 if MetadataItemList is None:
50 MetadataItemList = []
51 if AcceptAlgs is None:
52 AcceptAlgs = []
53 if HelperTools is None:
54 HelperTools = []
55
56 eventInfoKey = "EventInfo"
57 if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
58 eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
59
60 msg = logging.getLogger("OutputStreamCfg")
61 flagName = f"Output.{streamName}FileName"
62 if flags.hasFlag(flagName):
63 fileName = flags._get(flagName)
64 else:
65 fileName = f"my{streamName}.pool.root"
66 msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
67
68 if fileName in flags.Input.Files:
69 raise ConfigurationError("Same name for input and output file %s" % fileName)
70
71 result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
72
73 # Set up AthenaPoolCnvSvc through PoolWriteCfg
74 from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
75 result.merge(PoolWriteCfg(flags))
76
77 # define athena output stream
78 writingTool = CompFactory.AthenaOutputStreamTool(
79 f"{outputStreamName(streamName)}Tool",
80 DataHeaderKey=outputStreamName(streamName),
81 TopLevelContainerName="",
82 SubLevelBranchName="<type>/<key>",
83 ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
84 )
85
86 # If we're running in augmentation mode, configure the writing tool accordingly
87 parentStream = f"Output.{streamName}ParentStream"
88 childStream = f"Output.{streamName}ChildStream"
89 if flags.hasFlag(childStream):
90 writingTool.SaveDecisions = True
91 elif flags.hasFlag(parentStream):
92 disableEventTag = True
93 writingTool.OutputCollection = f"POOLContainer_{streamName}"
94 writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
95 writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
96 writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
97 msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
98
99 # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
100 # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
101 finalItemList = []
102 if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
103 finalItemList = ItemList
104 else:
105 finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
106
107 outputStream = CompFactory.AthenaOutputStream(
108 outputStreamName(streamName),
109 StreamName=outputStreamName(streamName),
110 WritingTool=writingTool,
111 ItemList=finalItemList,
112 MetadataItemList=MetadataItemList,
113 OutputFile=fileName,
114 HelperTools=HelperTools,
115 )
116 if takeItemsFromInput:
117 # Avoid explicitly setting the property unless we want to set it
118 # to True (default in C++ is False). This avoids CA merge
119 # conflicts.
120 outputStream.TakeItemsFromInput = True
121 if not extendProvenanceRecord:
122 # Treat this similar to takeItemsFromInput
123 # (C++ default in this case is True)
124 outputStream.ExtendProvenanceRecord = False
125 if keepProvenanceTagsRegEx is not None:
126 # C++ defaults to '.*' which means all. Overwrite only on request.
127 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
128 outputStream.AcceptAlgs += AcceptAlgs
129 outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
130 if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
131 outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
132 # Ignore dependencies
133 from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
134 result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
135
136 result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
137 outputStream.MetadataStore = result.getService("MetaDataStore")
138
139 # Support for MT thinning.
140 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
141 StreamName=outputStreamName(streamName))
142 if trigNavThinningSvc is not None:
143 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
144 outputStream.HelperTools.append(thinningCacheTool)
145
146 # Event Tag
147 if not disableEventTag:
148 key = "SimpleTag"
149 outputStream.WritingTool.AttributeListKey=key
150
151 propagateInputAttributeList = False
152 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
153 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
154 result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
155 propagateInputAttributeList = True
156
157 # build eventinfo attribute list
158 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
159 Tool=CompFactory.EventInfoAttListTool(),
160 EventInfoKey=eventInfoKey,
161 PropagateInput=propagateInputAttributeList)
162 result.addEventAlgo(tagBuilder)
163
164 # For xAOD output
165 if "AOD" in streamName:
166 outputStream.WritingTool.SubLevelBranchName = "<key>"
167
168 result.addEventAlgo(outputStream, domain='IO')
169 return result
170
171
172def addToESD(flags, itemOrList, **kwargs):
173 """
174 Adds items to ESD stream
175
176 The argument can be either list of items or just one item
177 if further customisations are needed for output stream they can be passed via kwargs
178
179 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
180 """
181 if not flags.Output.doWriteESD:
182 return ComponentAccumulator()
183 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
184 return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
185
186
187def addToAOD(flags, itemOrList, **kwargs):
188 """
189 Adds items to AOD stream
190
191 @see add addToESD
192 """
193 if not flags.Output.doWriteAOD:
194 return ComponentAccumulator()
195 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
196 return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
197
198def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
199 """
200 Adds Metadata items to the stream named streamName
201
202 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
203 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
204 The former is needed when there are special kernels, e.g., simulation/derivation
205 The latter is needed primarily for the propagation of the FileMetaData tool
206
207 Returns CA to be merged
208 """
209 flagName = f"Output.doWrite{streamName}"
210 if not flags.hasFlag(flagName):
211 return ComponentAccumulator()
212 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
213 return OutputStreamCfg(flags, streamName, MetadataItemList=items,
214 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)
OutputStreamCfg(flags, streamName, ItemList=None, MetadataItemList=None, disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=None, HelperTools=None)
addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
addToESD(flags, itemOrList, **kwargs)
addToAOD(flags, itemOrList, **kwargs)