25):
26 """Configure an output stream for writing data to POOL files.
27
28 Args:
29 flags: Configuration flags object
30 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
31 ItemList: List of data objects to write to the stream
32 MetadataItemList: List of metadata objects to write
33 disableEventTag: If True, disable event tagging
34 trigNavThinningSvc: Trigger navigation thinning service
35 takeItemsFromInput: If True, take items from input file
36 extendProvenanceRecord: If True, extend provenance record with processing tags
37 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
38 Only matching tags will be copied to the new DataHeader.
39 Empty string rejects all tags. Direct provenance is not affected
40 (see extendProvenanceRecord).
41 AcceptAlgs: List of algorithms that must accept the event for it to be written
42 HelperTools: List of helper tools to attach to the stream
43
44 Returns:
45 ComponentAccumulator: Configured output stream and associated services
46 """
47
48 if ItemList is None:
49 ItemList = []
50 if MetadataItemList is None:
51 MetadataItemList = []
52 if AcceptAlgs is None:
53 AcceptAlgs = []
54 if HelperTools is None:
55 HelperTools = []
56
57 eventInfoKey = "EventInfo"
58 if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
59 eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
60
61 msg = logging.getLogger("OutputStreamCfg")
62 flagName = f"Output.{streamName}FileName"
63 if flags.hasFlag(flagName):
64 fileName = flags._get(flagName)
65 else:
66 fileName = f"my{streamName}.pool.root"
67 msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
68
69 if fileName in flags.Input.Files:
70 raise ConfigurationError("Same name for input and output file %s" % fileName)
71
72 result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
73
74
75 from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
76 result.merge(PoolWriteCfg(flags))
77
78
79
80
81 metaDataTech = flags.Output.StorageTechnology.MetaData.get(fileName)
82 if metaDataTech is None:
83 metaDataTech = flags.Output.StorageTechnology.MetaData.get('*')
84 if metaDataTech is None:
85
86 eventDataTech = flags.Output.StorageTechnology.EventData.get(
87 fileName,
88 flags.Output.StorageTechnology.EventData.get('*', flags.PoolSvc.DefaultContainerType)
89 )
90 metaDataTech = eventDataTech
91
92
93 writingTool = CompFactory.AthenaOutputStreamTool(
94 f"{outputStreamName(streamName)}Tool",
95 DataHeaderKey=outputStreamName(streamName),
96 MetaDataPoolContainerPrefix=f"{metaDataTech}:MetaData",
97 MetaDataOutputCollection=f"{metaDataTech}:MetaDataHdr",
98 ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
99 )
100
101
102 parentStream = f"Output.{streamName}ParentStream"
103 childStream = f"Output.{streamName}ChildStream"
104 if flags.hasFlag(childStream):
105 writingTool.SaveDecisions = True
106 elif flags.hasFlag(parentStream):
107 disableEventTag = True
108 writingTool.OutputCollection = f"POOLContainer_{streamName}"
109 writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
110 writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
111 writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
112 msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
113
114
115
116 finalItemList = []
117 if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
118 finalItemList = ItemList
119 else:
120 finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
121
122 outputStream = CompFactory.AthenaOutputStream(
123 outputStreamName(streamName),
124 StreamName=outputStreamName(streamName),
125 WritingTool=writingTool,
126 ItemList=finalItemList,
127 MetadataItemList=MetadataItemList,
128 OutputFile=fileName,
129 HelperTools=HelperTools,
130 )
131 if takeItemsFromInput:
132
133
134
135 outputStream.TakeItemsFromInput = True
136 if not extendProvenanceRecord:
137
138
139 outputStream.ExtendProvenanceRecord = False
140 if keepProvenanceTagsRegEx is not None:
141
142 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
143 outputStream.AcceptAlgs += AcceptAlgs
144 outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
145 if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
146 outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
147
148 from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
149 result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
150
151 result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
152 outputStream.MetadataStore = result.getService("MetaDataStore")
153
154
155 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
156 StreamName=outputStreamName(streamName))
157 if trigNavThinningSvc is not None:
158 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
159 outputStream.HelperTools.append(thinningCacheTool)
160
161
162 if not disableEventTag:
163 key = "SimpleTag"
164 outputStream.WritingTool.AttributeListKey=key
165
166 propagateInputAttributeList = False
167 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
168 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
169 result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
170 propagateInputAttributeList = True
171
172
173 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
174 Tool=CompFactory.EventInfoAttListTool(),
175 EventInfoKey=eventInfoKey,
176 PropagateInput=propagateInputAttributeList)
177 result.addEventAlgo(tagBuilder)
178
179
180 if "AOD" in streamName:
181 outputStream.WritingTool.SubLevelBranchName = "<key>"
182
183 result.addEventAlgo(outputStream, domain='IO')
184 return result
185
186