24):
25 """Configure an output stream for writing data to POOL files.
26
27 Args:
28 flags: Configuration flags object
29 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
30 ItemList: List of data objects to write to the stream
31 MetadataItemList: List of metadata objects to write
32 disableEventTag: If True, disable event tagging
33 trigNavThinningSvc: Trigger navigation thinning service
34 takeItemsFromInput: If True, take items from input file
35 extendProvenanceRecord: If True, extend provenance record with processing tags
36 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
37 Only matching tags will be copied to the new DataHeader.
38 Empty string rejects all tags. Direct provenance is not affected
39 (see extendProvenanceRecord).
40 AcceptAlgs: List of algorithms that must accept the event for it to be written
41 HelperTools: List of helper tools to attach to the stream
42
43 Returns:
44 ComponentAccumulator: Configured output stream and associated services
45 """
46
47 if ItemList is None:
48 ItemList = []
49 if MetadataItemList is None:
50 MetadataItemList = []
51 if AcceptAlgs is None:
52 AcceptAlgs = []
53 if HelperTools is None:
54 HelperTools = []
55
56 eventInfoKey = "EventInfo"
57 if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
58 eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
59
60 msg = logging.getLogger("OutputStreamCfg")
61 flagName = f"Output.{streamName}FileName"
62 if flags.hasFlag(flagName):
63 fileName = flags._get(flagName)
64 else:
65 fileName = f"my{streamName}.pool.root"
66 msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
67
68 if fileName in flags.Input.Files:
69 raise ConfigurationError("Same name for input and output file %s" % fileName)
70
71 result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
72
73
74 from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
75 result.merge(PoolWriteCfg(flags))
76
77
78 writingTool = CompFactory.AthenaOutputStreamTool(
79 f"{outputStreamName(streamName)}Tool",
80 DataHeaderKey=outputStreamName(streamName),
81 TopLevelContainerName="",
82 SubLevelBranchName="<type>/<key>",
83 ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
84 )
85
86
87 parentStream = f"Output.{streamName}ParentStream"
88 childStream = f"Output.{streamName}ChildStream"
89 if flags.hasFlag(childStream):
90 writingTool.SaveDecisions = True
91 elif flags.hasFlag(parentStream):
92 disableEventTag = True
93 writingTool.OutputCollection = f"POOLContainer_{streamName}"
94 writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
95 writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
96 writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
97 msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
98
99
100
101 finalItemList = []
102 if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
103 finalItemList = ItemList
104 else:
105 finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
106
107 outputStream = CompFactory.AthenaOutputStream(
108 outputStreamName(streamName),
109 StreamName=outputStreamName(streamName),
110 WritingTool=writingTool,
111 ItemList=finalItemList,
112 MetadataItemList=MetadataItemList,
113 OutputFile=fileName,
114 HelperTools=HelperTools,
115 )
116 if takeItemsFromInput:
117
118
119
120 outputStream.TakeItemsFromInput = True
121 if not extendProvenanceRecord:
122
123
124 outputStream.ExtendProvenanceRecord = False
125 if keepProvenanceTagsRegEx is not None:
126
127 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
128 outputStream.AcceptAlgs += AcceptAlgs
129 outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
130 if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
131 outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
132
133 from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
134 result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
135
136 result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
137 outputStream.MetadataStore = result.getService("MetaDataStore")
138
139
140 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
141 StreamName=outputStreamName(streamName))
142 if trigNavThinningSvc is not None:
143 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
144 outputStream.HelperTools.append(thinningCacheTool)
145
146
147 if not disableEventTag:
148 key = "SimpleTag"
149 outputStream.WritingTool.AttributeListKey=key
150
151 propagateInputAttributeList = False
152 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
153 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
154 result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
155 propagateInputAttributeList = True
156
157
158 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
159 Tool=CompFactory.EventInfoAttListTool(),
160 EventInfoKey=eventInfoKey,
161 PropagateInput=propagateInputAttributeList)
162 result.addEventAlgo(tagBuilder)
163
164
165 if "AOD" in streamName:
166 outputStream.WritingTool.SubLevelBranchName = "<key>"
167
168 result.addEventAlgo(outputStream, domain='IO')
169 return result
170
171