1 """Configuration for POOL file writing
3 Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
6 from AthenaConfiguration.AccumulatorCache
import AccumulatorCache
9 """Helper function to override TreeAutoFlush from flags."""
10 if not flags.Output.TreeAutoFlush
or not isinstance(flags.Output.TreeAutoFlush, dict):
13 if stream
not in flags.Output.TreeAutoFlush:
16 override = flags.Output.TreeAutoFlush[stream]
17 if override
is not None:
18 logger.info(
'Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
25 Helper to get all the streams from configuration flags
26 For each stream that's configured to be written out
27 we have two flags w/ the following convention:
28 + Output.{STREAM}FileName
29 + Output.doWrite{STREAM}
32 for key, value
in flags._flagdict.items():
33 if key.startswith(
"Output.")
and key.endswith(
"FileName")
and value.get():
34 stream = key.removeprefix(
"Output.").removesuffix(
"FileName")
35 if stream
not in [
"HIST"]:
42 """Return ComponentAccumulator configured to Write POOL files"""
45 from AthenaCommon.Logging
import logging
46 logger = logging.getLogger(
'PoolWriteCfg' )
50 PoolAttributes += [
"DEFAULT_SPLITLEVEL ='0'"]
53 PoolAttributes += [
"STREAM_MEMBER_WISE = '1'"]
56 PoolAttributes += [
"DEFAULT_BUFFERSIZE = '32000'"]
59 PoolAttributes += [
"ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += [
"TREE_BRANCH_OFFSETTAB_LEN ='100'"]
63 from AthenaPoolCnvSvc
import PoolAttributeHelper
as pah
68 "EVNT_TR" : [2, 1, 1, 0, 0],
69 "HITS" : [2, 1, 10, 0, 0],
70 "RDO" : [2, 1, 10, 0, 0],
71 "ESD" : [2, 1, 10, 0, 0],
72 "AOD" : [2, 1, 100, 0, 0],
73 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
74 "DAOD_PHYS" : [5, 5, 500, 0, 1],
75 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
76 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
77 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 OutputMetadataContainers = []
88 fileName = getattr(flags.Output, f
"{stream}FileName")
91 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0
92 if stream
in defaults:
93 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
94 elif "DAOD" in stream:
95 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1
96 elif "D2AOD" in stream:
97 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1
100 compAlg = 1
if fileName.endswith(
'_000')
or fileName.startswith(
'tmp.')
else compAlg
106 logger.debug(f
"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
109 outputCollection =
"POOLContainer"
110 poolContainerPrefix =
"CollectionTree"
114 isAugmentation = flags.hasFlag(f
"Output.{stream}ParentStream")
115 if not isAugmentation:
117 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
118 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
122 PoolAttributes += [ pah.setMaxBufferSize( fileName,
"131072" ) ]
123 PoolAttributes += [ pah.setMinBufferEntries( fileName,
"10" ) ]
127 PoolAttributes += [ f
"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
130 outputCollection += f
"_{stream}"
131 poolContainerPrefix += f
"_{stream}"
132 OutputMetadataContainers += [f
"MetaData_{stream}"]
135 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
136 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
137 PoolAttributes += [ pah.setTreeAutoFlush( fileName,
"POOLContainerForm", autoFlush ) ]
140 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
141 PoolAttributes += [ pah.setContainerSplitLevel( fileName,
"Aux.", splitLvl ) ]
142 PoolAttributes += [ pah.setContainerSplitLevel( fileName,
"Dyn.", dynSplitLvl ) ]
145 maxAutoFlush =
max(maxAutoFlush, autoFlush)
149 useParallelCompression = flags.MP.UseSharedWriter
and flags.MP.UseParallelCompression
150 if useParallelCompression:
152 requestedEvents = flags.Exec.MaxEvents
153 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
154 totalEntries = availableEvents
if requestedEvents == -1
else min( availableEvents, requestedEvents )
155 if ( totalEntries > 0 )
and ( maxAutoFlush > 0 )
and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
156 logger.info(
"Not enough events to process, disabling parallel compression for SharedWriter!" )
157 logger.info( f
"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
158 f
"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
159 useParallelCompression =
False
161 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolCnvSvcCfg
163 PoolAttributes=PoolAttributes,
164 ParallelCompression=useParallelCompression,
165 StorageTechnology=flags.Output.StorageTechnology.EventData,
166 OutputMetadataContainers=OutputMetadataContainers)