43def PoolWriteCfg(flags):
44 """Return ComponentAccumulator configured to Write POOL files"""
45
46
47 from AthenaCommon.Logging import logging
48 logger = logging.getLogger( 'PoolWriteCfg' )
49
50 PoolAttributes = []
51
52 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
53
54
55 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
56
57
58 PoolAttributes += [f"DEFAULT_CONTAINER_TYPE = '{ROOT.pool.DbType.getType(flags.Output.DefaultContainerType).type()}'"]
59
60
61 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
62
63
64 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
65 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
66
67 oneDHForm = flags.Output.OneDataHeaderForm
68
69
70 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
71
72
73
74 defaults = {
75 "EVNT" : [2, 1, 500, 0, 0],
76 "EVNT_TR" : [2, 1, 1, 0, 0],
77 "HITS" : [2, 1, 10, 0, 0],
78 "RDO" : [2, 1, 10, 0, 0],
79 "ESD" : [2, 1, 10, 0, 0],
80 "AOD" : [2, 1, 100, 0, 0],
81 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
82 "DAOD_PHYS" : [5, 5, 500, 0, 1],
83 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
84 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
85 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
86 }
87
88
89 OutputMetadataContainers = []
90
91
92 fileFlushSetting = {}
93 maxAutoFlush = -1
94 for stream in _getStreamsFromFlags(flags):
95
96
97 fileName = getattr(flags.Output, f"{stream}FileName")
98
99
100 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0
101 if stream in defaults:
102 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
103 elif "DAOD" in stream:
104 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1
105 elif "D2AOD" in stream:
106 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1
107
108
109
110
111
112
113
114
115 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
116 tempFileCompressionSetting = (5,1)
117 if isTemporaryStream:
118
119
120
121 from AthenaConfiguration.Enums import LHCPeriod
122 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
123 tempFileCompressionSetting = (1,1)
124 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
125 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
126
127
128 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
129
130
131 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
132
133
134 outputCollection = "POOLContainer"
135 poolContainerPrefix = "CollectionTree"
136
137
138
139 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
140 if not isAugmentation:
141
142 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
143 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
144
145
146 if "AOD" in stream:
147 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
148 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
149 else:
150
151
152 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
153
154
155 outputCollection += f"_{stream}"
156 poolContainerPrefix += f"_{stream}"
157 OutputMetadataContainers += [f"MetaData_{stream}"]
158
159
160 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
161 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
162 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
163 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ]
164 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
165 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
166 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
167
168
169 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
170 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
171 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
172
173
174
175
176 if "EVNT" in stream or "RDO" in stream:
177 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
178
179 oneDHForm = False
180
181
182 maxAutoFlush =
max(maxAutoFlush, autoFlush)
183
184
185
186 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
187 if useParallelCompression:
188
189 requestedEvents = flags.Exec.MaxEvents
190 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
191 totalEntries = availableEvents
if requestedEvents == -1
else min( availableEvents, requestedEvents )
192 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
193 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
194 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
195 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
196 useParallelCompression = False
197
198 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
199 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
200 return AthenaPoolSharedIOCnvSvcCfg(flags,
201 PoolAttributes=PoolAttributes,
202 ParallelCompression=useParallelCompression,
203 OutputMetadataContainers=OutputMetadataContainers,
204 OneDataHeaderForm=oneDHForm,
205 FileFlushSetting=fileFlushSetting,
206 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.Output.DefaultContainerType else "Historical"))
207 else:
208 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
209 return AthenaPoolCnvSvcCfg(flags,
210 PoolAttributes=PoolAttributes,
211 OneDataHeaderForm=oneDHForm,
212 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.Output.DefaultContainerType else "Historical"))