3 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory
import CompFactory
5 from AthenaConfiguration.Enums
import ProductionStep
6 from AthenaCommon.Logging
import logging
10 return f
"Stream{streamName}"
17 disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
18 extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]):
19 eventInfoKey =
"EventInfo"
20 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
21 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
23 msg = logging.getLogger(
"OutputStreamCfg")
24 flagName = f
"Output.{streamName}FileName"
25 if flags.hasFlag(flagName):
26 fileName = flags._get(flagName)
28 fileName = f
"my{streamName}.pool.root"
29 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
31 if fileName
in flags.Input.Files:
32 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
34 result =
ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
37 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
41 writingTool = CompFactory.AthenaOutputStreamTool(
42 f
"{outputStreamName(streamName)}Tool",
44 MetaDataPoolContainerPrefix=f
"{flags.Output.StorageTechnology.MetaData}:MetaData",
45 MetaDataOutputCollection=f
"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
46 ConversionService=
"AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader
or flags.MP.UseSharedWriter
else "AthenaPoolCnvSvc",
50 parentStream = f
"Output.{streamName}ParentStream"
51 childStream = f
"Output.{streamName}ChildStream"
52 if flags.hasFlag(childStream):
53 writingTool.SaveDecisions =
True
54 elif flags.hasFlag(parentStream):
55 disableEventTag =
True
56 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
57 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
58 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
59 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
60 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
65 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
66 finalItemList = ItemList
68 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
70 outputStream = CompFactory.AthenaOutputStream(
73 WritingTool=writingTool,
74 ItemList=finalItemList,
75 MetadataItemList=MetadataItemList,
77 HelperTools=HelperTools,
79 if takeItemsFromInput:
83 outputStream.TakeItemsFromInput =
True
84 if not extendProvenanceRecord:
87 outputStream.ExtendProvenanceRecord =
False
88 if keepProvenanceTagsRegEx
is not None:
90 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
91 outputStream.AcceptAlgs += AcceptAlgs
92 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
93 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
94 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
96 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
99 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
100 outputStream.MetadataStore = result.getService(
"MetaDataStore")
103 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
105 if trigNavThinningSvc
is not None:
106 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
107 outputStream.HelperTools.append(thinningCacheTool)
110 if not disableEventTag:
112 outputStream.WritingTool.AttributeListKey=key
114 propagateInputAttributeList =
False
115 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
116 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
118 propagateInputAttributeList =
True
121 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
122 Tool=CompFactory.EventInfoAttListTool(),
123 EventInfoKey=eventInfoKey,
124 PropagateInput=propagateInputAttributeList)
125 result.addEventAlgo(tagBuilder)
128 if "AOD" in streamName:
129 outputStream.WritingTool.SubLevelBranchName =
"<key>"
131 result.addEventAlgo(outputStream, domain=
'IO')
137 Adds items to ESD stream
139 The argument can be either list of items or just one item
140 if further customisations are needed for output stream they can be passed via kwargs
142 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
144 if not flags.Output.doWriteESD:
146 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
152 Adds items to AOD stream
156 if not flags.Output.doWriteAOD:
158 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
161 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
163 Adds Metadata items to the stream named streamName
165 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
166 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
167 The former is needed when there are special kernels, e.g., simulation/derivation
168 The latter is needed primarily for the propagation of the FileMetaData tool
170 Returns CA to be merged
172 flagName = f
"Output.doWrite{streamName}"
173 if not flags.hasFlag(flagName):
175 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
177 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)