ATLAS Offline Software
Loading...
Searching...
No Matches
PoolWriteConfig.py
Go to the documentation of this file.
1"""Configuration for POOL file writing
2
3Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4"""
5
6from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7
8def _overrideTreeAutoFlush(logger, flags, stream, value):
9 """Helper function to override TreeAutoFlush from flags."""
10 if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11 return value
12
13 if stream not in flags.Output.TreeAutoFlush:
14 return value
15
16 override = flags.Output.TreeAutoFlush[stream]
17 if override is not None:
18 logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19 return override
20
21 return value
22
24 """
25 Helper to get all the streams from configuration flags
26 For each stream that's configured to be written out
27 we have two flags w/ the following convention:
28 + Output.{STREAM}FileName
29 + Output.doWrite{STREAM}
30 """
31 result = []
32 for key, value in flags._flagdict.items():
33 if key.startswith("Output.") and key.endswith("FileName") and value.get():
34 stream = key.removeprefix("Output.").removesuffix("FileName")
35 if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36 result.append(stream)
37 return result
38
39
40@AccumulatorCache
41def PoolWriteCfg(flags):
42 """Return ComponentAccumulator configured to Write POOL files"""
43 # based on WriteAthenaPool._configureWriteAthenaPool
44
45 from AthenaCommon.Logging import logging
46 logger = logging.getLogger( 'PoolWriteCfg' )
47
48 PoolAttributes = []
49 # Switch off splitting by setting default SplitLevel to 0
50 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51
52 # Set as default the member-wise streaming, ROOT default
53 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54
55 # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57
58 # Set POOLContainerForm(DataHeaderForm) split level to 0
59 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61
62 oneDHForm = flags.Output.OneDataHeaderForm
63
64 # Kept in sync with RecoUtils.py
65 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66
67 # Defaults for common formats
68 # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69 defaults = {
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 }
82
83 # Metadata containers needed for augmentations
84 OutputMetadataContainers = []
85
86 # Loop over all streams and set the appropriate attributes
87 maxAutoFlush = -1
88 storageTechnologyMap = flags.Output.StorageTechnology.EventData or {'*': flags.PoolSvc.DefaultContainerType}
89 for stream in _getStreamsFromFlags(flags):
90
91 # Get the file name - Guaranteed to exist at this point
92 fileName = getattr(flags.Output, f"{stream}FileName")
93
94 # Get the ROOT settings to be applied
95 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
96 if stream in defaults:
97 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98 elif "DAOD" in stream:
99 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
100 elif "D2AOD" in stream:
101 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
102
103 # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
104 # Temporary in this context might mean one of three things:
105 # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
106 # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
107 # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
108 # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
109 # and the job configuration (CA) so that we don't need to rely on the file names here...
110 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
111 tempFileCompressionSetting = (5,1) # ZSTD at level 1
112 if isTemporaryStream:
113 # Outputs created in certain workflows are read with older ROOT versions.
114 # E.g., temporary RDO files that are used in Run-2 simulation.
115 # For those, we have to use ZLIB
116 from AthenaConfiguration.Enums import LHCPeriod
117 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
118 tempFileCompressionSetting = (1,1) # ZLIB at level 1
119 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
121
122 # See if the user asked for the AutoFlush to be overwritten
123 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
124
125 # Print some debugging information
126 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
127
128 # Set the Collection/Container prefixes (make configurable?)
129 outputCollection = "POOLContainer"
130 poolContainerPrefix = "CollectionTree"
131
132 # Check to see if this stream is an augmentation
133 # Only set file-level attributes for the owning stream
134 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
135 if not isAugmentation:
136 # Set the Compression attributes
137 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
139
140 # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
141 if "AOD" in stream:
142 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
143 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
144 else:
145 # Changes in this else block need to be coordinated w/ OutputStreamConfig!
146 # Set the master index
147 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
148
149 # Set the Collection/Container prefixes
150 outputCollection += f"_{stream}"
151 poolContainerPrefix += f"_{stream}"
152 OutputMetadataContainers += [f"MetaData_{stream}"]
153
154 # Set the AutoFlush attributes
155 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
158
159 # Set the Spit Level attributes
160 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
161 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
162 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
163
164 # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
165 # This workaround is needed so that older releases can read files created by the new ones
166 # For more information see ATEAM-1001
167 if "EVNT" in stream or "RDO" in stream:
168 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
169 # also for compatibility with rel21 disable single DataHeaderForm
170 oneDHForm = False
171
172 # Find the maximum AutoFlush across all formats
173 maxAutoFlush = max(maxAutoFlush, autoFlush)
174
175 # If no EventData technology is set for this specific file
176 # (or globally) use flags.PoolSvc.DefaultContainerType
177 if fileName not in storageTechnologyMap and '*' not in storageTechnologyMap:
178 storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
179
180 # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
181 # In this context, "enough" means each worker has a chance to make at least one flush to the disk
182 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
183 if useParallelCompression:
184 # Now compute the total number of events this job will process
185 requestedEvents = flags.Exec.MaxEvents
186 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
187 totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
188 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
189 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
190 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
191 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
192 useParallelCompression = False
193
194 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
195 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
196 return AthenaPoolSharedIOCnvSvcCfg(flags,
197 PoolAttributes=PoolAttributes,
198 ParallelCompression=useParallelCompression,
199 StorageTechnology=storageTechnologyMap,
200 OutputMetadataContainers=OutputMetadataContainers,
201 OneDataHeaderForm = oneDHForm)
202 else:
203 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
204 return AthenaPoolCnvSvcCfg(flags,
205 PoolAttributes=PoolAttributes,
206 StorageTechnology=storageTechnologyMap,
207 OneDataHeaderForm = oneDHForm)
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
_overrideTreeAutoFlush(logger, flags, stream, value)