ATLAS Offline Software
Loading...
Searching...
No Matches
PoolWriteConfig.py
Go to the documentation of this file.
1"""Configuration for POOL file writing
2
3Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4"""
5
6from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7
8import ROOT
9
10def _overrideTreeAutoFlush(logger, flags, stream, value):
11 """Helper function to override TreeAutoFlush from flags."""
12 if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
13 return value
14
15 if stream not in flags.Output.TreeAutoFlush:
16 return value
17
18 override = flags.Output.TreeAutoFlush[stream]
19 if override is not None:
20 logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
21 return override
22
23 return value
24
26 """
27 Helper to get all the streams from configuration flags
28 For each stream that's configured to be written out
29 we have two flags w/ the following convention:
30 + Output.{STREAM}FileName
31 + Output.doWrite{STREAM}
32 """
33 result = []
34 for key, value in flags._flagdict.items():
35 if key.startswith("Output.") and key.endswith("FileName") and value.get():
36 stream = key.removeprefix("Output.").removesuffix("FileName")
37 if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
38 result.append(stream)
39 return result
40
41
42@AccumulatorCache
43def PoolWriteCfg(flags):
44 """Return ComponentAccumulator configured to Write POOL files"""
45 # based on WriteAthenaPool._configureWriteAthenaPool
46
47 from AthenaCommon.Logging import logging
48 logger = logging.getLogger( 'PoolWriteCfg' )
49
50 PoolAttributes = []
51 # Switch off splitting by setting default SplitLevel to 0
52 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
53
54 # Set as default the member-wise streaming, ROOT default
55 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
56
57 # Set the default container type
58 PoolAttributes += [f"DEFAULT_CONTAINER_TYPE = '{ROOT.pool.DbType.getType(flags.Output.DefaultContainerType).type()}'"]
59
60 # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
61 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
62
63 # Set POOLContainerForm(DataHeaderForm) split level to 0
64 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
65 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
66
67 oneDHForm = flags.Output.OneDataHeaderForm
68
69 # Kept in sync with RecoUtils.py
70 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
71
72 # Defaults for common formats
73 # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
74 defaults = {
75 "EVNT" : [2, 1, 500, 0, 0],
76 "EVNT_TR" : [2, 1, 1, 0, 0],
77 "HITS" : [2, 1, 10, 0, 0],
78 "RDO" : [2, 1, 10, 0, 0],
79 "ESD" : [2, 1, 10, 0, 0],
80 "AOD" : [2, 1, 100, 0, 0],
81 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
82 "DAOD_PHYS" : [5, 5, 500, 0, 1],
83 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
84 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
85 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
86 }
87
88 # Metadata containers needed for augmentations
89 OutputMetadataContainers = []
90
91 # Loop over all streams and set the appropriate attributes
92 fileFlushSetting = {}
93 maxAutoFlush = -1
94 for stream in _getStreamsFromFlags(flags):
95
96 # Get the file name - Guaranteed to exist at this point
97 fileName = getattr(flags.Output, f"{stream}FileName")
98
99 # Get the ROOT settings to be applied
100 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
101 if stream in defaults:
102 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
103 elif "DAOD" in stream:
104 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
105 elif "D2AOD" in stream:
106 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
107
108 # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
109 # Temporary in this context might mean one of three things:
110 # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
111 # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
112 # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
113 # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
114 # and the job configuration (CA) so that we don't need to rely on the file names here...
115 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
116 tempFileCompressionSetting = (5,1) # ZSTD at level 1
117 if isTemporaryStream:
118 # Outputs created in certain workflows are read with older ROOT versions.
119 # E.g., temporary RDO files that are used in Run-2 simulation.
120 # For those, we have to use ZLIB
121 from AthenaConfiguration.Enums import LHCPeriod
122 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
123 tempFileCompressionSetting = (1,1) # ZLIB at level 1
124 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
125 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
126
127 # See if the user asked for the AutoFlush to be overwritten
128 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
129
130 # Print some debugging information
131 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
132
133 # Set the Collection/Container prefixes (make configurable?)
134 outputCollection = "POOLContainer"
135 poolContainerPrefix = "CollectionTree"
136
137 # Check to see if this stream is an augmentation
138 # Only set file-level attributes for the owning stream
139 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
140 if not isAugmentation:
141 # Set the Compression attributes
142 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
143 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
144
145 # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
146 if "AOD" in stream:
147 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
148 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
149 else:
150 # Changes in this else block need to be coordinated w/ OutputStreamConfig!
151 # Set the master index
152 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
153
154 # Set the Collection/Container prefixes
155 outputCollection += f"_{stream}"
156 poolContainerPrefix += f"_{stream}"
157 OutputMetadataContainers += [f"MetaData_{stream}"]
158
159 # Set the AutoFlush & Maximum Size attributes
160 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
161 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
162 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
163 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ] # 1 TB
164 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
165 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
166 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
167
168 # Set the Spit Level attributes
169 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
170 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
171 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
172
173 # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
174 # This workaround is needed so that older releases can read files created by the new ones
175 # For more information see ATEAM-1001
176 if "EVNT" in stream or "RDO" in stream:
177 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
178 # also for compatibility with rel21 disable single DataHeaderForm
179 oneDHForm = False
180
181 # Find the maximum AutoFlush across all formats
182 maxAutoFlush = max(maxAutoFlush, autoFlush)
183
184 # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
185 # In this context, "enough" means each worker has a chance to make at least one flush to the disk
186 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
187 if useParallelCompression:
188 # Now compute the total number of events this job will process
189 requestedEvents = flags.Exec.MaxEvents
190 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
191 totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
192 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
193 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
194 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
195 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
196 useParallelCompression = False
197
198 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
199 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
200 return AthenaPoolSharedIOCnvSvcCfg(flags,
201 PoolAttributes=PoolAttributes,
202 ParallelCompression=useParallelCompression,
203 OutputMetadataContainers=OutputMetadataContainers,
204 OneDataHeaderForm=oneDHForm,
205 FileFlushSetting=fileFlushSetting,
206 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.Output.DefaultContainerType else "Historical"))
207 else:
208 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
209 return AthenaPoolCnvSvcCfg(flags,
210 PoolAttributes=PoolAttributes,
211 OneDataHeaderForm=oneDHForm,
212 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.Output.DefaultContainerType else "Historical"))
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
_overrideTreeAutoFlush(logger, flags, stream, value)