ATLAS Offline Software
Loading...
Searching...
No Matches
python.OutputStreamConfig Namespace Reference

Functions

 outputStreamName (streamName)
 OutputStreamCfg (flags, streamName, ItemList=None, MetadataItemList=None, disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=None, HelperTools=None)
 addToESD (flags, itemOrList, **kwargs)
 addToAOD (flags, itemOrList, **kwargs)
 addToMetaData (flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)

Function Documentation

◆ addToAOD()

python.OutputStreamConfig.addToAOD ( flags,
itemOrList,
** kwargs )
Adds items to AOD stream

@see add addToESD

Definition at line 202 of file OutputStreamConfig.py.

202def addToAOD(flags, itemOrList, **kwargs):
203 """
204 Adds items to AOD stream
205
206 @see add addToESD
207 """
208 if not flags.Output.doWriteAOD:
209 return ComponentAccumulator()
210 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
211 return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
212

◆ addToESD()

python.OutputStreamConfig.addToESD ( flags,
itemOrList,
** kwargs )
Adds items to ESD stream

The argument can be either list of items or just one item
if further customisations are needed for output stream they can be passed via kwargs

returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))

Definition at line 187 of file OutputStreamConfig.py.

187def addToESD(flags, itemOrList, **kwargs):
188 """
189 Adds items to ESD stream
190
191 The argument can be either list of items or just one item
192 if further customisations are needed for output stream they can be passed via kwargs
193
194 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
195 """
196 if not flags.Output.doWriteESD:
197 return ComponentAccumulator()
198 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
199 return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
200
201

◆ addToMetaData()

python.OutputStreamConfig.addToMetaData ( flags,
streamName,
itemOrList,
AcceptAlgs = [],
HelperTools = [],
** kwargs )
Adds Metadata items to the stream named streamName

Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
The former is needed when there are special kernels, e.g., simulation/derivation
The latter is needed primarily for the propagation of the FileMetaData tool

Returns CA to be merged

Definition at line 213 of file OutputStreamConfig.py.

213def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
214 """
215 Adds Metadata items to the stream named streamName
216
217 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
218 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
219 The former is needed when there are special kernels, e.g., simulation/derivation
220 The latter is needed primarily for the propagation of the FileMetaData tool
221
222 Returns CA to be merged
223 """
224 flagName = f"Output.doWrite{streamName}"
225 if not flags.hasFlag(flagName):
226 return ComponentAccumulator()
227 items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
228 return OutputStreamCfg(flags, streamName, MetadataItemList=items,
229 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)

◆ OutputStreamCfg()

python.OutputStreamConfig.OutputStreamCfg ( flags,
streamName,
ItemList = None,
MetadataItemList = None,
disableEventTag = False,
trigNavThinningSvc = None,
takeItemsFromInput = False,
extendProvenanceRecord = True,
keepProvenanceTagsRegEx = None,
AcceptAlgs = None,
HelperTools = None )
Configure an output stream for writing data to POOL files.

Args:
    flags: Configuration flags object
    streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
    ItemList: List of data objects to write to the stream
    MetadataItemList: List of metadata objects to write
    disableEventTag: If True, disable event tagging
    trigNavThinningSvc: Trigger navigation thinning service
    takeItemsFromInput: If True, take items from input file
    extendProvenanceRecord: If True, extend provenance record with processing tags
    keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
                            Only matching tags will be copied to the new DataHeader.
                            Empty string rejects all tags. Direct provenance is not affected
                            (see extendProvenanceRecord).
    AcceptAlgs: List of algorithms that must accept the event for it to be written
    HelperTools: List of helper tools to attach to the stream

Returns:
    ComponentAccumulator: Configured output stream and associated services

Definition at line 13 of file OutputStreamConfig.py.

25):
26 """Configure an output stream for writing data to POOL files.
27
28 Args:
29 flags: Configuration flags object
30 streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
31 ItemList: List of data objects to write to the stream
32 MetadataItemList: List of metadata objects to write
33 disableEventTag: If True, disable event tagging
34 trigNavThinningSvc: Trigger navigation thinning service
35 takeItemsFromInput: If True, take items from input file
36 extendProvenanceRecord: If True, extend provenance record with processing tags
37 keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
38 Only matching tags will be copied to the new DataHeader.
39 Empty string rejects all tags. Direct provenance is not affected
40 (see extendProvenanceRecord).
41 AcceptAlgs: List of algorithms that must accept the event for it to be written
42 HelperTools: List of helper tools to attach to the stream
43
44 Returns:
45 ComponentAccumulator: Configured output stream and associated services
46 """
47 # Handle mutable default arguments
48 if ItemList is None:
49 ItemList = []
50 if MetadataItemList is None:
51 MetadataItemList = []
52 if AcceptAlgs is None:
53 AcceptAlgs = []
54 if HelperTools is None:
55 HelperTools = []
56
57 eventInfoKey = "EventInfo"
58 if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
59 eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
60
61 msg = logging.getLogger("OutputStreamCfg")
62 flagName = f"Output.{streamName}FileName"
63 if flags.hasFlag(flagName):
64 fileName = flags._get(flagName)
65 else:
66 fileName = f"my{streamName}.pool.root"
67 msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
68
69 if fileName in flags.Input.Files:
70 raise ConfigurationError("Same name for input and output file %s" % fileName)
71
72 result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
73
74 # Set up AthenaPoolCnvSvc through PoolWriteCfg
75 from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
76 result.merge(PoolWriteCfg(flags))
77
78 # Extract the metadata storage technology for this file
79 # First try exact filename match, then wildcard
80 # If not found, default to the same technology as EventData for this file
81 metaDataTech = flags.Output.StorageTechnology.MetaData.get(fileName)
82 if metaDataTech is None:
83 metaDataTech = flags.Output.StorageTechnology.MetaData.get('*')
84 if metaDataTech is None:
85 # Fall back to EventData technology for this file
86 eventDataTech = flags.Output.StorageTechnology.EventData.get(
87 fileName,
88 flags.Output.StorageTechnology.EventData.get('*', flags.PoolSvc.DefaultContainerType)
89 )
90 metaDataTech = eventDataTech
91
92 # define athena output stream
93 writingTool = CompFactory.AthenaOutputStreamTool(
94 f"{outputStreamName(streamName)}Tool",
95 DataHeaderKey=outputStreamName(streamName),
96 MetaDataPoolContainerPrefix=f"{metaDataTech}:MetaData",
97 MetaDataOutputCollection=f"{metaDataTech}:MetaDataHdr",
98 ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
99 )
100
101 # If we're running in augmentation mode, configure the writing tool accordingly
102 parentStream = f"Output.{streamName}ParentStream"
103 childStream = f"Output.{streamName}ChildStream"
104 if flags.hasFlag(childStream):
105 writingTool.SaveDecisions = True
106 elif flags.hasFlag(parentStream):
107 disableEventTag = True
108 writingTool.OutputCollection = f"POOLContainer_{streamName}"
109 writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
110 writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
111 writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
112 msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
113
114 # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
115 # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
116 finalItemList = []
117 if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
118 finalItemList = ItemList
119 else:
120 finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
121
122 outputStream = CompFactory.AthenaOutputStream(
123 outputStreamName(streamName),
124 StreamName=outputStreamName(streamName),
125 WritingTool=writingTool,
126 ItemList=finalItemList,
127 MetadataItemList=MetadataItemList,
128 OutputFile=fileName,
129 HelperTools=HelperTools,
130 )
131 if takeItemsFromInput:
132 # Avoid explicitly setting the property unless we want to set it
133 # to True (default in C++ is False). This avoids CA merge
134 # conflicts.
135 outputStream.TakeItemsFromInput = True
136 if not extendProvenanceRecord:
137 # Treat this similar to takeItemsFromInput
138 # (C++ default in this case is True)
139 outputStream.ExtendProvenanceRecord = False
140 if keepProvenanceTagsRegEx is not None:
141 # C++ defaults to '.*' which means all. Overwrite only on request.
142 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
143 outputStream.AcceptAlgs += AcceptAlgs
144 outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
145 if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
146 outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
147 # Ignore dependencies
148 from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
149 result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
150
151 result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
152 outputStream.MetadataStore = result.getService("MetaDataStore")
153
154 # Support for MT thinning.
155 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
156 StreamName=outputStreamName(streamName))
157 if trigNavThinningSvc is not None:
158 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
159 outputStream.HelperTools.append(thinningCacheTool)
160
161 # Event Tag
162 if not disableEventTag:
163 key = "SimpleTag"
164 outputStream.WritingTool.AttributeListKey=key
165
166 propagateInputAttributeList = False
167 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
168 from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
169 result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
170 propagateInputAttributeList = True
171
172 # build eventinfo attribute list
173 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
174 Tool=CompFactory.EventInfoAttListTool(),
175 EventInfoKey=eventInfoKey,
176 PropagateInput=propagateInputAttributeList)
177 result.addEventAlgo(tagBuilder)
178
179 # For xAOD output
180 if "AOD" in streamName:
181 outputStream.WritingTool.SubLevelBranchName = "<key>"
182
183 result.addEventAlgo(outputStream, domain='IO')
184 return result
185
186

◆ outputStreamName()

python.OutputStreamConfig.outputStreamName ( streamName)

Definition at line 9 of file OutputStreamConfig.py.

9def outputStreamName(streamName):
10 return f"Stream{streamName}"
11
12