ATLAS Offline Software
Loading...
Searching...
No Matches
PoolWriteConfig.py
Go to the documentation of this file.
1"""Configuration for POOL file writing
2
3Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4"""
5
6from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7
8def _overrideTreeAutoFlush(logger, flags, stream, value):
9 """Helper function to override TreeAutoFlush from flags."""
10 if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11 return value
12
13 if stream not in flags.Output.TreeAutoFlush:
14 return value
15
16 override = flags.Output.TreeAutoFlush[stream]
17 if override is not None:
18 logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19 return override
20
21 return value
22
24 """
25 Helper to get all the streams from configuration flags
26 For each stream that's configured to be written out
27 we have two flags w/ the following convention:
28 + Output.{STREAM}FileName
29 + Output.doWrite{STREAM}
30 """
31 result = []
32 for key, value in flags._flagdict.items():
33 if key.startswith("Output.") and key.endswith("FileName") and value.get():
34 stream = key.removeprefix("Output.").removesuffix("FileName")
35 if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36 result.append(stream)
37 return result
38
39
40@AccumulatorCache
41def PoolWriteCfg(flags):
42 """Return ComponentAccumulator configured to Write POOL files"""
43 # based on WriteAthenaPool._configureWriteAthenaPool
44
45 from AthenaCommon.Logging import logging
46 logger = logging.getLogger( 'PoolWriteCfg' )
47
48 PoolAttributes = []
49 # Switch off splitting by setting default SplitLevel to 0
50 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51
52 # Set as default the member-wise streaming, ROOT default
53 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54
55 # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57
58 # Set POOLContainerForm(DataHeaderForm) split level to 0
59 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61
62 oneDHForm = flags.Output.OneDataHeaderForm
63
64 # Kept in sync with RecoUtils.py
65 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66
67 # Defaults for common formats
68 # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69 defaults = {
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 }
82
83 # Metadata containers needed for augmentations
84 OutputMetadataContainers = []
85
86 # Loop over all streams and set the appropriate attributes
87 fileFlushSetting = {}
88 maxAutoFlush = -1
89 storageTechnologyMap = flags.Output.StorageTechnology.EventData or {'*': flags.PoolSvc.DefaultContainerType}
90 for stream in _getStreamsFromFlags(flags):
91
92 # Get the file name - Guaranteed to exist at this point
93 fileName = getattr(flags.Output, f"{stream}FileName")
94
95 # Get the ROOT settings to be applied
96 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
97 if stream in defaults:
98 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
99 elif "DAOD" in stream:
100 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
101 elif "D2AOD" in stream:
102 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
103
104 # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
105 # Temporary in this context might mean one of three things:
106 # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
107 # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
108 # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
109 # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
110 # and the job configuration (CA) so that we don't need to rely on the file names here...
111 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
112 tempFileCompressionSetting = (5,1) # ZSTD at level 1
113 if isTemporaryStream:
114 # Outputs created in certain workflows are read with older ROOT versions.
115 # E.g., temporary RDO files that are used in Run-2 simulation.
116 # For those, we have to use ZLIB
117 from AthenaConfiguration.Enums import LHCPeriod
118 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
119 tempFileCompressionSetting = (1,1) # ZLIB at level 1
120 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
121 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
122
123 # See if the user asked for the AutoFlush to be overwritten
124 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
125
126 # Print some debugging information
127 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
128
129 # Set the Collection/Container prefixes (make configurable?)
130 outputCollection = "POOLContainer"
131 poolContainerPrefix = "CollectionTree"
132
133 # Check to see if this stream is an augmentation
134 # Only set file-level attributes for the owning stream
135 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
136 if not isAugmentation:
137 # Set the Compression attributes
138 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
139 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
140
141 # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
142 if "AOD" in stream:
143 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
144 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
145 else:
146 # Changes in this else block need to be coordinated w/ OutputStreamConfig!
147 # Set the master index
148 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
149
150 # Set the Collection/Container prefixes
151 outputCollection += f"_{stream}"
152 poolContainerPrefix += f"_{stream}"
153 OutputMetadataContainers += [f"MetaData_{stream}"]
154
155 # Set the AutoFlush & Maximum Size attributes
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
158 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
159 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ] # 1 TB
160 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
161 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
162 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
163
164 # Set the Spit Level attributes
165 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
166 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
167 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
168
169 # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
170 # This workaround is needed so that older releases can read files created by the new ones
171 # For more information see ATEAM-1001
172 if "EVNT" in stream or "RDO" in stream:
173 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
174 # also for compatibility with rel21 disable single DataHeaderForm
175 oneDHForm = False
176
177 # Find the maximum AutoFlush across all formats
178 maxAutoFlush = max(maxAutoFlush, autoFlush)
179
180 # If no EventData technology is set for this specific file
181 # (or globally) use flags.PoolSvc.DefaultContainerType
182 if fileName not in storageTechnologyMap and '*' not in storageTechnologyMap:
183 storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
184
185 # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
186 # In this context, "enough" means each worker has a chance to make at least one flush to the disk
187 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
188 if useParallelCompression:
189 # Now compute the total number of events this job will process
190 requestedEvents = flags.Exec.MaxEvents
191 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
192 totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
193 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
194 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
195 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
196 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
197 useParallelCompression = False
198
199 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
200 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
201 return AthenaPoolSharedIOCnvSvcCfg(flags,
202 PoolAttributes=PoolAttributes,
203 ParallelCompression=useParallelCompression,
204 StorageTechnology=storageTechnologyMap,
205 OutputMetadataContainers=OutputMetadataContainers,
206 OneDataHeaderForm=oneDHForm,
207 FileFlushSetting=fileFlushSetting)
208 else:
209 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
210 return AthenaPoolCnvSvcCfg(flags,
211 PoolAttributes=PoolAttributes,
212 StorageTechnology=storageTechnologyMap,
213 OneDataHeaderForm=oneDHForm)
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
_overrideTreeAutoFlush(logger, flags, stream, value)