3 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator, ConfigurationError
4 from AthenaConfiguration.ComponentFactory
import CompFactory
5 from AthenaConfiguration.Enums
import ProductionStep
6 from AthenaCommon.Logging
import logging
10 return f
"Stream{streamName}"
13 disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
14 extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[]):
15 eventInfoKey =
"EventInfo"
16 if flags.Common.ProductionStep
in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking]:
17 eventInfoKey = f
"{flags.Overlay.BkgPrefix}EventInfo"
19 msg = logging.getLogger(
"OutputStreamCfg")
20 flagName = f
"Output.{streamName}FileName"
21 if flags.hasFlag(flagName):
22 fileName = flags._get(flagName)
24 fileName = f
"my{streamName}.pool.root"
25 msg.info(
"No file name predefined for stream %s. Using %s", streamName, fileName)
27 if fileName
in flags.Input.Files:
28 raise ConfigurationError(
"Same name for input and output file %s" % fileName)
30 result =
ComponentAccumulator(sequence = CompFactory.AthSequencer(
"AthOutSeq", StopOverride=
True))
33 from AthenaPoolCnvSvc.PoolWriteConfig
import PoolWriteCfg
37 writingTool = CompFactory.AthenaOutputStreamTool(
38 f
"{outputStreamName(streamName)}Tool",
40 MetaDataPoolContainerPrefix=f
"{flags.Output.StorageTechnology.MetaData}:MetaData",
41 MetaDataOutputCollection=f
"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
45 parentStream = f
"Output.{streamName}ParentStream"
46 childStream = f
"Output.{streamName}ChildStream"
47 if flags.hasFlag(childStream):
48 writingTool.SaveDecisions =
True
49 elif flags.hasFlag(parentStream):
50 disableEventTag =
True
51 writingTool.OutputCollection = f
"POOLContainer_{streamName}"
52 writingTool.PoolContainerPrefix = f
"CollectionTree_{streamName}"
53 writingTool.MetaDataOutputCollection = f
"MetaDataHdr_{streamName}"
54 writingTool.MetaDataPoolContainerPrefix = f
"MetaData_{streamName}"
55 msg.info(
"Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
60 if any(name
in streamName
for name
in {
"DAOD_",
"D2AOD_"}):
61 finalItemList = ItemList
63 finalItemList = [f
"xAOD::EventInfo#{eventInfoKey}", f
"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
65 outputStream = CompFactory.AthenaOutputStream(
68 WritingTool=writingTool,
69 ItemList=finalItemList,
70 MetadataItemList=MetadataItemList,
72 HelperTools=HelperTools,
74 if takeItemsFromInput:
78 outputStream.TakeItemsFromInput =
True
79 if not extendProvenanceRecord:
82 outputStream.ExtendProvenanceRecord =
False
83 outputStream.AcceptAlgs += AcceptAlgs
84 outputStream.ExtraOutputs.add((
"DataHeader", f
"StoreGateSvc+{outputStreamName(streamName)}"))
85 if flags.Scheduler.CheckOutputUsage
and flags.Concurrency.NumThreads > 0:
86 outputStream.ExtraInputs = {tuple(l.split(
'#'))
for l
in finalItemList
if '*' not in l
and 'Aux' not in l}
88 from AthenaConfiguration.MainServicesConfig
import OutputUsageIgnoreCfg
91 result.addService(CompFactory.StoreGateSvc(
"MetaDataStore"))
92 outputStream.MetadataStore = result.getService(
"MetaDataStore")
95 thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f
"ThinningCacheTool_Stream{streamName}",
97 if trigNavThinningSvc
is not None:
98 thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
99 outputStream.HelperTools.append(thinningCacheTool)
102 if not disableEventTag:
104 outputStream.WritingTool.AttributeListKey=key
106 propagateInputAttributeList =
False
107 if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
108 from SGComps.SGInputLoaderConfig
import SGInputLoaderCfg
110 propagateInputAttributeList =
True
113 tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
114 Tool=CompFactory.EventInfoAttListTool(),
115 EventInfoKey=eventInfoKey,
116 PropagateInput=propagateInputAttributeList)
117 result.addEventAlgo(tagBuilder)
120 if "AOD" in streamName:
121 outputStream.WritingTool.SubLevelBranchName =
"<key>"
123 result.addEventAlgo(outputStream, domain=
'IO')
129 Adds items to ESD stream
131 The argument can be either list of items or just one item
132 if further customisations are needed for output stream they can be passed via kwargs
134 returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
136 if not flags.Output.doWriteESD:
138 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
144 Adds items to AOD stream
148 if not flags.Output.doWriteAOD:
150 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
153 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
155 Adds Metadata items to the stream named streamName
157 Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
158 The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
159 The former is needed when there are special kernels, e.g., simulation/derivation
160 The latter is needed primarily for the propagation of the FileMetaData tool
162 Returns CA to be merged
164 flagName = f
"Output.doWrite{streamName}"
165 if not flags.hasFlag(flagName):
167 items = [itemOrList]
if isinstance(itemOrList, str)
else itemOrList
169 AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)