16 MetadataItemList=None,
17 disableEventTag=False,
18 trigNavThinningSvc=None,
19 takeItemsFromInput=False,
20 extendProvenanceRecord=True,
21 keepProvenanceTagsRegEx=None,
25 """Configure an output stream for writing data to POOL files.
28 flags: Configuration flags object
29 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
30 ItemList: List of data objects to write to the stream
31 MetadataItemList: List of metadata objects to write
32 disableEventTag: If True, disable event tagging
33 trigNavThinningSvc: Trigger navigation thinning service
34 takeItemsFromInput: If True, take items from input file
35 extendProvenanceRecord: If True, extend provenance record with processing tags
36 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
37 Only matching tags will be copied to the new DataHeader.
38 Empty string rejects all tags. Direct provenance is not affected
39 (see extendProvenanceRecord).
40 AcceptAlgs: List of algorithms that must accept the event for it to be written
41 HelperTools: List of helper tools to attach to the stream
44 ComponentAccumulator: Configured output stream and associated services
49 if MetadataItemList
is None:
51 if AcceptAlgs
is None:
53 if HelperTools
is None:
56 eventInfoKey =
"EventInfo"
57 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
58 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
60 msg = logging.getLogger(
"OutputStreamCfg")
61 flagName = f
"Output.{streamName}FileName"
62 if flags.hasFlag(flagName):
63 fileName = flags._get(flagName)
65 fileName = f
"my{streamName}.pool.root"
66 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
68 if fileName
in flags.Input.Files:
69 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
71 result = ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
74 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
75 result.merge(PoolWriteCfg(flags))
78 writingTool = CompFactory.AthenaOutputStreamTool(
79 f
"{outputStreamName(streamName)}Tool",
81 TopLevelContainerName=
"",
82 SubLevelBranchName=
"<type>/<key>",
83 ConversionService=
"AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader
or flags.MP.UseSharedWriter
else "AthenaPoolCnvSvc",
87 parentStream = f
"Output.{streamName}ParentStream"
88 childStream = f
"Output.{streamName}ChildStream"
89 if flags.hasFlag(childStream):
90 writingTool.SaveDecisions =
True
91 elif flags.hasFlag(parentStream):
92 disableEventTag =
True
93 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
94 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
95 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
96 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
97 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
102 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
103 finalItemList = ItemList
105 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
107 outputStream = CompFactory.AthenaOutputStream(
110 WritingTool=writingTool,
111 ItemList=finalItemList,
112 MetadataItemList=MetadataItemList,
114 HelperTools=HelperTools,
116 if takeItemsFromInput:
120 outputStream.TakeItemsFromInput =
True
121 if not extendProvenanceRecord:
124 outputStream.ExtendProvenanceRecord =
False
125 if keepProvenanceTagsRegEx
is not None:
127 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
128 outputStream.AcceptAlgs += AcceptAlgs
129 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
130 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
131 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
133 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
134 result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
136 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
137 outputStream.MetadataStore = result.getService(
"MetaDataStore")
140 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
142 if trigNavThinningSvc
is not None:
143 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
144 outputStream.HelperTools.append(thinningCacheTool)
147 if not disableEventTag:
149 outputStream.WritingTool.AttributeListKey=key
151 propagateInputAttributeList =
False
152 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
153 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
154 result.merge(SGInputLoaderCfg(flags, [
"AthenaAttributeList#Input"]))
155 propagateInputAttributeList =
True
158 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
159 Tool=CompFactory.EventInfoAttListTool(),
160 EventInfoKey=eventInfoKey,
161 PropagateInput=propagateInputAttributeList)
162 result.addEventAlgo(tagBuilder)
165 if "AOD" in streamName:
166 outputStream.WritingTool.SubLevelBranchName =
"<key>"
168 result.addEventAlgo(outputStream, domain=
'IO')
198def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
200 Adds Metadata items to the stream named streamName
202 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
203 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
204 The former is needed when there are special kernels, e.g., simulation/derivation
205 The latter is needed primarily for the propagation of the FileMetaData tool
207 Returns CA to be merged
209 flagName = f
"Output.doWrite{streamName}"
210 if not flags.hasFlag(flagName):
211 return ComponentAccumulator()
212 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
214 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)
OutputStreamCfg(flags, streamName, ItemList=None, MetadataItemList=None, disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=None, HelperTools=None)