41def PoolWriteCfg(flags):
42 """Return ComponentAccumulator configured to Write POOL files"""
43
44
45 from AthenaCommon.Logging import logging
46 logger = logging.getLogger( 'PoolWriteCfg' )
47
48 PoolAttributes = []
49
50 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51
52
53 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54
55
56 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57
58
59 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61
62 oneDHForm = flags.Output.OneDataHeaderForm
63
64
65 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66
67
68
69 defaults = {
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 }
82
83
84 OutputMetadataContainers = []
85
86
87 fileFlushSetting = {}
88 maxAutoFlush = -1
89 for stream in _getStreamsFromFlags(flags):
90
91
92 fileName = getattr(flags.Output, f"{stream}FileName")
93
94
95 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0
96 if stream in defaults:
97 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98 elif "DAOD" in stream:
99 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1
100 elif "D2AOD" in stream:
101 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1
102
103
104
105
106
107
108
109
110 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
111 tempFileCompressionSetting = (5,1)
112 if isTemporaryStream:
113
114
115
116 from AthenaConfiguration.Enums import LHCPeriod
117 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
118 tempFileCompressionSetting = (1,1)
119 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
121
122
123 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
124
125
126 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
127
128
129 outputCollection = "POOLContainer"
130 poolContainerPrefix = "CollectionTree"
131
132
133
134 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
135 if not isAugmentation:
136
137 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
139
140
141 if "AOD" in stream:
142 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
143 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
144 else:
145
146
147 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
148
149
150 outputCollection += f"_{stream}"
151 poolContainerPrefix += f"_{stream}"
152 OutputMetadataContainers += [f"MetaData_{stream}"]
153
154
155 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
158 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ]
159 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
160 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
161 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
162
163
164 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
165 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
166 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
167
168
169
170
171 if "EVNT" in stream or "RDO" in stream:
172 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
173
174 oneDHForm = False
175
176
177 maxAutoFlush =
max(maxAutoFlush, autoFlush)
178
179
180
181 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
182 if useParallelCompression:
183
184 requestedEvents = flags.Exec.MaxEvents
185 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
186 totalEntries = availableEvents
if requestedEvents == -1
else min( availableEvents, requestedEvents )
187 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
188 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
189 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
190 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
191 useParallelCompression = False
192
193 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
194 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
195 return AthenaPoolSharedIOCnvSvcCfg(flags,
196 PoolAttributes=PoolAttributes,
197 ParallelCompression=useParallelCompression,
198 OutputMetadataContainers=OutputMetadataContainers,
199 OneDataHeaderForm=oneDHForm,
200 FileFlushSetting=fileFlushSetting,
201 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.PoolSvc.DefaultContainerType else "Historical"))
202 else:
203 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
204 return AthenaPoolCnvSvcCfg(flags,
205 PoolAttributes=PoolAttributes,
206 OneDataHeaderForm=oneDHForm,
207 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.PoolSvc.DefaultContainerType else "Historical"))