42 """Return ComponentAccumulator configured to Write POOL files"""
45 from AthenaCommon.Logging
import logging
46 logger = logging.getLogger(
'PoolWriteCfg' )
50 PoolAttributes += [
"DEFAULT_SPLITLEVEL ='0'"]
53 PoolAttributes += [
"STREAM_MEMBER_WISE = '1'"]
56 PoolAttributes += [
"DEFAULT_BUFFERSIZE = '32000'"]
59 PoolAttributes += [
"ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += [
"TREE_BRANCH_OFFSETTAB_LEN ='100'"]
62 oneDHForm = flags.Output.OneDataHeaderForm
65 from AthenaPoolCnvSvc
import PoolAttributeHelper
as pah
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
84 OutputMetadataContainers = []
88 storageTechnologyMap = flags.Output.StorageTechnology.EventData
or {
'*': flags.PoolSvc.DefaultContainerType}
92 fileName = getattr(flags.Output, f
"{stream}FileName")
95 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0
96 if stream
in defaults:
97 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98 elif "DAOD" in stream:
99 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1
100 elif "D2AOD" in stream:
101 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1
110 isTemporaryStream = fileName.endswith(
'_000')
or fileName.startswith(
'tmp.')
or stream
in flags.Output.TemporaryStreams
111 tempFileCompressionSetting = (5,1)
112 if isTemporaryStream:
116 from AthenaConfiguration.Enums
import LHCPeriod
117 if "RDO" in stream
and hasattr(flags,
"GeoModel")
and flags.GeoModel.Run < LHCPeriod.Run3:
118 tempFileCompressionSetting = (1,1)
119 logger.info(f
"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120 compAlg, compLvl = tempFileCompressionSetting
if isTemporaryStream
else (compAlg, compLvl)
126 logger.debug(f
"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
129 outputCollection =
"POOLContainer"
130 poolContainerPrefix =
"CollectionTree"
134 isAugmentation = flags.hasFlag(f
"Output.{stream}ParentStream")
135 if not isAugmentation:
137 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
142 PoolAttributes += [ pah.setMaxBufferSize( fileName,
"131072" ) ]
143 PoolAttributes += [ pah.setMinBufferEntries( fileName,
"10" ) ]
147 PoolAttributes += [ f
"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
150 outputCollection += f
"_{stream}"
151 poolContainerPrefix += f
"_{stream}"
152 OutputMetadataContainers += [f
"MetaData_{stream}"]
155 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName,
"POOLContainerForm", autoFlush ) ]
160 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
161 PoolAttributes += [ pah.setContainerSplitLevel( fileName,
"Aux.", splitLvl ) ]
162 PoolAttributes += [ pah.setContainerSplitLevel( fileName,
"Dyn.", dynSplitLvl ) ]
167 if "EVNT" in stream
or "RDO" in stream:
168 PoolAttributes += [ f
"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
173 maxAutoFlush =
max(maxAutoFlush, autoFlush)
177 if fileName
not in storageTechnologyMap
and '*' not in storageTechnologyMap:
178 storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
182 useParallelCompression = flags.MP.UseSharedWriter
and flags.MP.UseParallelCompression
183 if useParallelCompression:
185 requestedEvents = flags.Exec.MaxEvents
186 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
187 totalEntries = availableEvents
if requestedEvents == -1
else min( availableEvents, requestedEvents )
188 if ( totalEntries > 0 )
and ( maxAutoFlush > 0 )
and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
189 logger.info(
"Not enough events to process, disabling parallel compression for SharedWriter!" )
190 logger.info( f
"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
191 f
"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
192 useParallelCompression =
False
194 if flags.MP.UseSharedReader
or flags.MP.UseSharedWriter:
195 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolSharedIOCnvSvcCfg
197 PoolAttributes=PoolAttributes,
198 ParallelCompression=useParallelCompression,
199 StorageTechnology=storageTechnologyMap,
200 OutputMetadataContainers=OutputMetadataContainers,
201 OneDataHeaderForm = oneDHForm)
203 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolCnvSvcCfg
205 PoolAttributes=PoolAttributes,
206 StorageTechnology=storageTechnologyMap,
207 OneDataHeaderForm = oneDHForm)