17 MetadataItemList=None,
18 disableEventTag=False,
19 trigNavThinningSvc=None,
20 takeItemsFromInput=False,
21 extendProvenanceRecord=True,
22 keepProvenanceTagsRegEx=None,
26 """Configure an output stream for writing data to POOL files.
29 flags: Configuration flags object
30 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
31 ItemList: List of data objects to write to the stream
32 MetadataItemList: List of metadata objects to write
33 disableEventTag: If True, disable event tagging
34 trigNavThinningSvc: Trigger navigation thinning service
35 takeItemsFromInput: If True, take items from input file
36 extendProvenanceRecord: If True, extend provenance record with processing tags
37 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
38 Only matching tags will be copied to the new DataHeader.
39 Empty string rejects all tags. Direct provenance is not affected
40 (see extendProvenanceRecord).
41 AcceptAlgs: List of algorithms that must accept the event for it to be written
42 HelperTools: List of helper tools to attach to the stream
45 ComponentAccumulator: Configured output stream and associated services
50 if MetadataItemList
is None:
52 if AcceptAlgs
is None:
54 if HelperTools
is None:
57 eventInfoKey =
"EventInfo"
58 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
59 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
61 msg = logging.getLogger(
"OutputStreamCfg")
62 flagName = f
"Output.{streamName}FileName"
63 if flags.hasFlag(flagName):
64 fileName = flags._get(flagName)
66 fileName = f
"my{streamName}.pool.root"
67 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
69 if fileName
in flags.Input.Files:
70 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
72 result =
ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
75 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
81 metaDataTech = flags.Output.StorageTechnology.MetaData.get(fileName)
82 if metaDataTech
is None:
83 metaDataTech = flags.Output.StorageTechnology.MetaData.get(
'*')
84 if metaDataTech
is None:
86 eventDataTech = flags.Output.StorageTechnology.EventData.get(
88 flags.Output.StorageTechnology.EventData.get(
'*',
'ROOTTREEINDEX')
90 metaDataTech = eventDataTech
93 writingTool = CompFactory.AthenaOutputStreamTool(
94 f
"{outputStreamName(streamName)}Tool",
96 MetaDataPoolContainerPrefix=f
"{metaDataTech}:MetaData",
97 MetaDataOutputCollection=f
"{metaDataTech}:MetaDataHdr",
98 ConversionService=
"AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader
or flags.MP.UseSharedWriter
else "AthenaPoolCnvSvc",
102 parentStream = f
"Output.{streamName}ParentStream"
103 childStream = f
"Output.{streamName}ChildStream"
104 if flags.hasFlag(childStream):
105 writingTool.SaveDecisions =
True
106 elif flags.hasFlag(parentStream):
107 disableEventTag =
True
108 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
109 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
110 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
111 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
112 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
117 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
118 finalItemList = ItemList
120 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
122 outputStream = CompFactory.AthenaOutputStream(
125 WritingTool=writingTool,
126 ItemList=finalItemList,
127 MetadataItemList=MetadataItemList,
129 HelperTools=HelperTools,
131 if takeItemsFromInput:
135 outputStream.TakeItemsFromInput =
True
136 if not extendProvenanceRecord:
139 outputStream.ExtendProvenanceRecord =
False
140 if keepProvenanceTagsRegEx
is not None:
142 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
143 outputStream.AcceptAlgs += AcceptAlgs
144 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
145 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
146 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
148 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
151 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
152 outputStream.MetadataStore = result.getService(
"MetaDataStore")
155 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
157 if trigNavThinningSvc
is not None:
158 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
159 outputStream.HelperTools.append(thinningCacheTool)
162 if not disableEventTag:
164 outputStream.WritingTool.AttributeListKey=key
166 propagateInputAttributeList =
False
167 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
168 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
170 propagateInputAttributeList =
True
173 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
174 Tool=CompFactory.EventInfoAttListTool(),
175 EventInfoKey=eventInfoKey,
176 PropagateInput=propagateInputAttributeList)
177 result.addEventAlgo(tagBuilder)
180 if "AOD" in streamName:
181 outputStream.WritingTool.SubLevelBranchName =
"<key>"
183 result.addEventAlgo(outputStream, domain=
'IO')