ATLAS Offline Software
Loading...
Searching...
No Matches
python.PoolWriteConfig Namespace Reference

Functions

 _overrideTreeAutoFlush (logger, flags, stream, value)
 _getStreamsFromFlags (flags)
 PoolWriteCfg (flags)

Detailed Description

Configuration for POOL file writing

Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration

Function Documentation

◆ _getStreamsFromFlags()

python.PoolWriteConfig._getStreamsFromFlags ( flags)
protected
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
    + Output.{STREAM}FileName
    + Output.doWrite{STREAM}

Definition at line 23 of file PoolWriteConfig.py.

23def _getStreamsFromFlags(flags):
24 """
25 Helper to get all the streams from configuration flags
26 For each stream that's configured to be written out
27 we have two flags w/ the following convention:
28 + Output.{STREAM}FileName
29 + Output.doWrite{STREAM}
30 """
31 result = []
32 for key, value in flags._flagdict.items():
33 if key.startswith("Output.") and key.endswith("FileName") and value.get():
34 stream = key.removeprefix("Output.").removesuffix("FileName")
35 if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36 result.append(stream)
37 return result
38
39
40@AccumulatorCache

◆ _overrideTreeAutoFlush()

python.PoolWriteConfig._overrideTreeAutoFlush ( logger,
flags,
stream,
value )
protected
Helper function to override TreeAutoFlush from flags.

Definition at line 8 of file PoolWriteConfig.py.

8def _overrideTreeAutoFlush(logger, flags, stream, value):
9 """Helper function to override TreeAutoFlush from flags."""
10 if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11 return value
12
13 if stream not in flags.Output.TreeAutoFlush:
14 return value
15
16 override = flags.Output.TreeAutoFlush[stream]
17 if override is not None:
18 logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19 return override
20
21 return value
22

◆ PoolWriteCfg()

python.PoolWriteConfig.PoolWriteCfg ( flags)
Return ComponentAccumulator configured to Write POOL files

Definition at line 41 of file PoolWriteConfig.py.

41def PoolWriteCfg(flags):
42 """Return ComponentAccumulator configured to Write POOL files"""
43 # based on WriteAthenaPool._configureWriteAthenaPool
44
45 from AthenaCommon.Logging import logging
46 logger = logging.getLogger( 'PoolWriteCfg' )
47
48 PoolAttributes = []
49 # Switch off splitting by setting default SplitLevel to 0
50 PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51
52 # Set as default the member-wise streaming, ROOT default
53 PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54
55 # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56 PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57
58 # Set POOLContainerForm(DataHeaderForm) split level to 0
59 PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60 PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61
62 oneDHForm = flags.Output.OneDataHeaderForm
63
64 # Kept in sync with RecoUtils.py
65 from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66
67 # Defaults for common formats
68 # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69 defaults = {
70 "EVNT" : [2, 1, 500, 0, 0],
71 "EVNT_TR" : [2, 1, 1, 0, 0],
72 "HITS" : [2, 1, 10, 0, 0],
73 "RDO" : [2, 1, 10, 0, 0],
74 "ESD" : [2, 1, 10, 0, 0],
75 "AOD" : [2, 1, 100, 0, 0],
76 "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77 "DAOD_PHYS" : [5, 5, 500, 0, 1],
78 "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79 "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80 "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81 }
82
83 # Metadata containers needed for augmentations
84 OutputMetadataContainers = []
85
86 # Loop over all streams and set the appropriate attributes
87 fileFlushSetting = {}
88 maxAutoFlush = -1
89 storageTechnologyMap = flags.Output.StorageTechnology.EventData or {'*': flags.PoolSvc.DefaultContainerType}
90 for stream in _getStreamsFromFlags(flags):
91
92 # Get the file name - Guaranteed to exist at this point
93 fileName = getattr(flags.Output, f"{stream}FileName")
94
95 # Get the ROOT settings to be applied
96 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
97 if stream in defaults:
98 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
99 elif "DAOD" in stream:
100 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
101 elif "D2AOD" in stream:
102 compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
103
104 # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
105 # Temporary in this context might mean one of three things:
106 # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
107 # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
108 # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
109 # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
110 # and the job configuration (CA) so that we don't need to rely on the file names here...
111 isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
112 tempFileCompressionSetting = (5,1) # ZSTD at level 1
113 if isTemporaryStream:
114 # Outputs created in certain workflows are read with older ROOT versions.
115 # E.g., temporary RDO files that are used in Run-2 simulation.
116 # For those, we have to use ZLIB
117 from AthenaConfiguration.Enums import LHCPeriod
118 if "RDO" in stream and flags.hasCategory("GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
119 tempFileCompressionSetting = (1,1) # ZLIB at level 1
120 logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
121 compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
122
123 # See if the user asked for the AutoFlush to be overwritten
124 autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
125
126 # Print some debugging information
127 logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
128
129 # Set the Collection/Container prefixes (make configurable?)
130 outputCollection = "POOLContainer"
131 poolContainerPrefix = "CollectionTree"
132
133 # Check to see if this stream is an augmentation
134 # Only set file-level attributes for the owning stream
135 isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
136 if not isAugmentation:
137 # Set the Compression attributes
138 PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
139 PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
140
141 # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
142 if "AOD" in stream:
143 PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
144 PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
145 else:
146 # Changes in this else block need to be coordinated w/ OutputStreamConfig!
147 # Set the master index
148 PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
149
150 # Set the Collection/Container prefixes
151 outputCollection += f"_{stream}"
152 poolContainerPrefix += f"_{stream}"
153 OutputMetadataContainers += [f"MetaData_{stream}"]
154
155 # Set the AutoFlush & Maximum Size attributes
156 PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
157 PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
158 PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
159 PoolAttributes += [ pah.setTreeMaxSize( fileName, "*", "1099511627776L" ) ] # 1 TB
160 if flags.MP.UseSharedWriter and flags.MP.UseParallelCompression:
161 fileFlushSetting[fileName] = ( flags.MP.SharedWriter.FileFlushSetting.get(fileName, autoFlush) )
162 logger.info(f"Setting auto write for {fileName} to {fileFlushSetting[fileName]} events")
163
164 # Set the Spit Level attributes
165 PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
166 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
167 PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
168
169 # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
170 # This workaround is needed so that older releases can read files created by the new ones
171 # For more information see ATEAM-1001
172 if "EVNT" in stream or "RDO" in stream:
173 PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
174 # also for compatibility with rel21 disable single DataHeaderForm
175 oneDHForm = False
176
177 # Find the maximum AutoFlush across all formats
178 maxAutoFlush = max(maxAutoFlush, autoFlush)
179
180 # If no EventData technology is set for this specific file
181 # (or globally) use flags.PoolSvc.DefaultContainerType
182 if fileName not in storageTechnologyMap and '*' not in storageTechnologyMap:
183 storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
184
185 # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
186 # In this context, "enough" means each worker has a chance to make at least one flush to the disk
187 useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
188 if useParallelCompression:
189 # Now compute the total number of events this job will process
190 requestedEvents = flags.Exec.MaxEvents
191 availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
192 totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
193 if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
194 logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
195 logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
196 f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
197 useParallelCompression = False
198
199 if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
200 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
201 return AthenaPoolSharedIOCnvSvcCfg(flags,
202 PoolAttributes=PoolAttributes,
203 ParallelCompression=useParallelCompression,
204 StorageTechnology=storageTechnologyMap,
205 OutputMetadataContainers=OutputMetadataContainers,
206 OneDataHeaderForm=oneDHForm,
207 FileFlushSetting=fileFlushSetting)
208 else:
209 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
210 return AthenaPoolCnvSvcCfg(flags,
211 PoolAttributes=PoolAttributes,
212 StorageTechnology=storageTechnologyMap,
213 OneDataHeaderForm=oneDHForm)
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41