3 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory
import CompFactory
5 from AthenaConfiguration.Enums
import ProductionStep
6 from AthenaCommon.Logging
import logging
10 return f
"Stream{streamName}"
17 MetadataItemList=None,
18 disableEventTag=False,
19 trigNavThinningSvc=None,
20 takeItemsFromInput=False,
21 extendProvenanceRecord=True,
22 keepProvenanceTagsRegEx=None,
26 """Configure an output stream for writing data to POOL files.
29 flags: Configuration flags object
30 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
31 ItemList: List of data objects to write to the stream
32 MetadataItemList: List of metadata objects to write
33 disableEventTag: If True, disable event tagging
34 trigNavThinningSvc: Trigger navigation thinning service
35 takeItemsFromInput: If True, take items from input file
36 extendProvenanceRecord: If True, extend provenance record with processing tags
37 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
38 Only matching tags will be copied to the new DataHeader.
39 Empty string rejects all tags. Direct provenance is not affected
40 (see extendProvenanceRecord).
41 AcceptAlgs: List of algorithms that must accept the event for it to be written
42 HelperTools: List of helper tools to attach to the stream
45 ComponentAccumulator: Configured output stream and associated services
50 if MetadataItemList
is None:
52 if AcceptAlgs
is None:
54 if HelperTools
is None:
57 eventInfoKey =
"EventInfo"
58 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
59 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
61 msg = logging.getLogger(
"OutputStreamCfg")
62 flagName = f
"Output.{streamName}FileName"
63 if flags.hasFlag(flagName):
64 fileName = flags._get(flagName)
66 fileName = f
"my{streamName}.pool.root"
67 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
69 if fileName
in flags.Input.Files:
70 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
72 result =
ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
75 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
81 metaDataTech = flags.Output.StorageTechnology.MetaData.get(fileName)
82 if metaDataTech
is None:
83 metaDataTech = flags.Output.StorageTechnology.MetaData.get(
'*')
84 if metaDataTech
is None:
86 eventDataTech = flags.Output.StorageTechnology.EventData.get(
88 flags.Output.StorageTechnology.EventData.get(
'*',
'ROOTTREEINDEX')
90 metaDataTech = eventDataTech
93 writingTool = CompFactory.AthenaOutputStreamTool(
94 f
"{outputStreamName(streamName)}Tool",
96 MetaDataPoolContainerPrefix=f
"{metaDataTech}:MetaData",
97 MetaDataOutputCollection=f
"{metaDataTech}:MetaDataHdr",
98 ConversionService=
"AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader
or flags.MP.UseSharedWriter
else "AthenaPoolCnvSvc",
102 parentStream = f
"Output.{streamName}ParentStream"
103 childStream = f
"Output.{streamName}ChildStream"
104 if flags.hasFlag(childStream):
105 writingTool.SaveDecisions =
True
106 elif flags.hasFlag(parentStream):
107 disableEventTag =
True
108 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
109 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
110 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
111 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
112 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
117 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
118 finalItemList = ItemList
120 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
122 outputStream = CompFactory.AthenaOutputStream(
125 WritingTool=writingTool,
126 ItemList=finalItemList,
127 MetadataItemList=MetadataItemList,
129 HelperTools=HelperTools,
131 if takeItemsFromInput:
135 outputStream.TakeItemsFromInput =
True
136 if not extendProvenanceRecord:
139 outputStream.ExtendProvenanceRecord =
False
140 if keepProvenanceTagsRegEx
is not None:
142 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
143 outputStream.AcceptAlgs += AcceptAlgs
144 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
145 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
146 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
148 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
151 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
152 outputStream.MetadataStore = result.getService(
"MetaDataStore")
155 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
157 if trigNavThinningSvc
is not None:
158 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
159 outputStream.HelperTools.append(thinningCacheTool)
162 if not disableEventTag:
164 outputStream.WritingTool.AttributeListKey=key
166 propagateInputAttributeList =
False
167 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
168 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
170 propagateInputAttributeList =
True
173 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
174 Tool=CompFactory.EventInfoAttListTool(),
175 EventInfoKey=eventInfoKey,
176 PropagateInput=propagateInputAttributeList)
177 result.addEventAlgo(tagBuilder)
180 if "AOD" in streamName:
181 outputStream.WritingTool.SubLevelBranchName =
"<key>"
183 result.addEventAlgo(outputStream, domain=
'IO')
189 Adds items to ESD stream
191 The argument can be either list of items or just one item
192 if further customisations are needed for output stream they can be passed via kwargs
194 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
196 if not flags.Output.doWriteESD:
198 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
204 Adds items to AOD stream
208 if not flags.Output.doWriteAOD:
210 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
213 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
215 Adds Metadata items to the stream named streamName
217 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
218 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
219 The former is needed when there are special kernels, e.g., simulation/derivation
220 The latter is needed primarily for the propagation of the FileMetaData tool
222 Returns CA to be merged
224 flagName = f
"Output.doWrite{streamName}"
225 if not flags.hasFlag(flagName):
227 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
229 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)