ATLAS Offline Software
Loading...
Searching...
No Matches
PoolWriteConfig.py
Go to the documentation of this file.
1"""Configuration for POOL file writing
2
3Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4"""
5
6from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7
8def _overrideTreeAutoFlush(logger, flags, stream, value):
9 """Helper function to override TreeAutoFlush from flags."""
10 if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11 return value
12
13 if stream not in flags.Output.TreeAutoFlush:
14 return value
15
16 override = flags.Output.TreeAutoFlush[stream]
17 if override is not None:
18 logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19 return override
20
21 return value
22
24 """
25 Helper to get all the streams from configuration flags
26 For each stream that's configured to be written out
27 we have two flags w/ the following convention:
28 + Output.{STREAM}FileName
29 + Output.doWrite{STREAM}
30 """
31 result = []
32 for key, value in flags._flagdict.items():
33 if key.startswith("Output.") and key.endswith("FileName") and value.get():
34 stream = key.removeprefix("Output.").removesuffix("FileName")
35 if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36 result.append(stream)
37 return result
38
39
40@AccumulatorCache
41def PoolWriteCfg(flags):
42 """Return ComponentAccumulator configured to Write POOL files"""
43 # based on WriteAthenaPool._configureWriteAthenaPool
44
45 from AthenaCommon.Logging import logging
46 logger = logging.getLogger( 'PoolWriteCfg' )
47
48 PoolAttributes = []
49 # Switch off splitting by setting default SplitLevel to 0
50 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51
52 # Set as default the member-wise streaming, ROOT default
53 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54
55 # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57
58 # Set POOLContainerForm(DataHeaderForm) split level to 0
59 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61
62 oneDHForm = flags.Output.OneDataHeaderForm
63
64 # Kept in sync with RecoUtils.py
65 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66
67 # Defaults for common formats
68 # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69 defaults = {
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 }
82
83 # Metadata containers needed for augmentations
84 OutputMetadataContainers = []
85
86 # Loop over all streams and set the appropriate attributes
87 fileFlushSetting = {}
88 maxAutoFlush = -1
89 for stream in _getStreamsFromFlags(flags):
90
91 # Get the file name - Guaranteed to exist at this point
92 fileName = getattr(flags.Output, f"{stream}FileName")
93
94 # Get the ROOT settings to be applied
95 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
96 if stream in defaults:
97 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98 elif "DAOD" in stream:
99 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
100 elif "D2AOD" in stream:
101 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
102
103 # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
104 # Temporary in this context might mean one of three things:
105 # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
106 # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
107 # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
108 # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
109 # and the job configuration (CA) so that we don't need to rely on the file names here...
110 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
111 tempFileCompressionSetting = (5,1) # ZSTD at level 1
112 if isTemporaryStream:
113 # Outputs created in certain workflows are read with older ROOT versions.
114 # E.g., temporary RDO files that are used in Run-2 simulation.
115 # For those, we have to use ZLIB
116 from AthenaConfiguration.Enums import LHCPeriod
117 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
118 tempFileCompressionSetting = (1,1) # ZLIB at level 1
119 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
121
122 # See if the user asked for the AutoFlush to be overwritten
123 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
124
125 # Print some debugging information
126 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
127
128 # Set the Collection/Container prefixes (make configurable?)
129 outputCollection = "POOLContainer"
130 poolContainerPrefix = "CollectionTree"
131
132 # Check to see if this stream is an augmentation
133 # Only set file-level attributes for the owning stream
134 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
135 if not isAugmentation:
136 # Set the Compression attributes
137 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
139
140 # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
141 if "AOD" in stream:
142 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
143 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
144 else:
145 # Changes in this else block need to be coordinated w/ OutputStreamConfig!
146 # Set the master index
147 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
148
149 # Set the Collection/Container prefixes
150 outputCollection += f"_{stream}"
151 poolContainerPrefix += f"_{stream}"
152 OutputMetadataContainers += [f"MetaData_{stream}"]
153
154 # Set the AutoFlush & Maximum Size attributes
155 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
158 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ] # 1 TB
159 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
160 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
161 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
162
163 # Set the Spit Level attributes
164 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
165 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
166 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
167
168 # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
169 # This workaround is needed so that older releases can read files created by the new ones
170 # For more information see ATEAM-1001
171 if "EVNT" in stream or "RDO" in stream:
172 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
173 # also for compatibility with rel21 disable single DataHeaderForm
174 oneDHForm = False
175
176 # Find the maximum AutoFlush across all formats
177 maxAutoFlush = max(maxAutoFlush, autoFlush)
178
179 # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
180 # In this context, "enough" means each worker has a chance to make at least one flush to the disk
181 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
182 if useParallelCompression:
183 # Now compute the total number of events this job will process
184 requestedEvents = flags.Exec.MaxEvents
185 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
186 totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
187 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
188 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
189 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
190 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
191 useParallelCompression = False
192
193 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
194 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
195 return AthenaPoolSharedIOCnvSvcCfg(flags,
196 PoolAttributes=PoolAttributes,
197 ParallelCompression=useParallelCompression,
198 OutputMetadataContainers=OutputMetadataContainers,
199 OneDataHeaderForm=oneDHForm,
200 FileFlushSetting=fileFlushSetting,
201 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.PoolSvc.DefaultContainerType else "Historical"))
202 else:
203 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
204 return AthenaPoolCnvSvcCfg(flags,
205 PoolAttributes=PoolAttributes,
206 OneDataHeaderForm=oneDHForm,
207 PoolContainerNamingScheme=("Canonical" if "RNTUPLE" in flags.PoolSvc.DefaultContainerType else "Historical"))
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
_overrideTreeAutoFlush(logger, flags, stream, value)