3 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory
import CompFactory
5 from AthenaConfiguration.Enums
import ProductionStep
6 from AthenaCommon.Logging
import logging
10 return f
"Stream{streamName}"
17 disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
18 extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]):
19 eventInfoKey =
"EventInfo"
20 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
21 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
23 msg = logging.getLogger(
"OutputStreamCfg")
24 flagName = f
"Output.{streamName}FileName"
25 if flags.hasFlag(flagName):
26 fileName = flags._get(flagName)
28 fileName = f
"my{streamName}.pool.root"
29 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
31 if fileName
in flags.Input.Files:
32 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
34 result =
ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
37 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
41 writingTool = CompFactory.AthenaOutputStreamTool(
42 f
"{outputStreamName(streamName)}Tool",
44 MetaDataPoolContainerPrefix=f
"{flags.Output.StorageTechnology.MetaData}:MetaData",
45 MetaDataOutputCollection=f
"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
49 parentStream = f
"Output.{streamName}ParentStream"
50 childStream = f
"Output.{streamName}ChildStream"
51 if flags.hasFlag(childStream):
52 writingTool.SaveDecisions =
True
53 elif flags.hasFlag(parentStream):
54 disableEventTag =
True
55 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
56 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
57 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
58 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
59 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
64 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
65 finalItemList = ItemList
67 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
69 outputStream = CompFactory.AthenaOutputStream(
72 WritingTool=writingTool,
73 ItemList=finalItemList,
74 MetadataItemList=MetadataItemList,
76 HelperTools=HelperTools,
78 if takeItemsFromInput:
82 outputStream.TakeItemsFromInput =
True
83 if not extendProvenanceRecord:
86 outputStream.ExtendProvenanceRecord =
False
87 if keepProvenanceTagsRegEx
is not None:
89 outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
90 outputStream.AcceptAlgs += AcceptAlgs
91 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
92 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
93 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
95 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
98 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
99 outputStream.MetadataStore = result.getService(
"MetaDataStore")
102 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
104 if trigNavThinningSvc
is not None:
105 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
106 outputStream.HelperTools.append(thinningCacheTool)
109 if not disableEventTag:
111 outputStream.WritingTool.AttributeListKey=key
113 propagateInputAttributeList =
False
114 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
115 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
117 propagateInputAttributeList =
True
120 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
121 Tool=CompFactory.EventInfoAttListTool(),
122 EventInfoKey=eventInfoKey,
123 PropagateInput=propagateInputAttributeList)
124 result.addEventAlgo(tagBuilder)
127 if "AOD" in streamName:
128 outputStream.WritingTool.SubLevelBranchName =
"<key>"
130 result.addEventAlgo(outputStream, domain=
'IO')
136 Adds items to ESD stream
138 The argument can be either list of items or just one item
139 if further customisations are needed for output stream they can be passed via kwargs
141 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
143 if not flags.Output.doWriteESD:
145 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
151 Adds items to AOD stream
155 if not flags.Output.doWriteAOD:
157 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
160 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
162 Adds Metadata items to the stream named streamName
164 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
165 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
166 The former is needed when there are special kernels, e.g., simulation/derivation
167 The latter is needed primarily for the propagation of the FileMetaData tool
169 Returns CA to be merged
171 flagName = f
"Output.doWrite{streamName}"
172 if not flags.hasFlag(flagName):
174 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
176 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)