ATLAS Offline Software
PoolWriteConfig.py
Go to the documentation of this file.
1 """Configuration for POOL file writing
2 
3 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4 """
5 
6 from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7 
8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache
41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  storageTechnologyMap = flags.Output.StorageTechnology.EventData or {'*': flags.PoolSvc.DefaultContainerType}
89  for stream in _getStreamsFromFlags(flags):
90 
91  # Get the file name - Guaranteed to exist at this point
92  fileName = getattr(flags.Output, f"{stream}FileName")
93 
94  # Get the ROOT settings to be applied
95  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
96  if stream in defaults:
97  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98  elif "DAOD" in stream:
99  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
100  elif "D2AOD" in stream:
101  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
102 
103  # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
104  # Temporary in this context might mean one of three things:
105  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
106  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
107  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
108  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
109  # and the job configuration (CA) so that we don't need to rely on the file names here...
110  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
111  tempFileCompressionSetting = (5,1) # ZSTD at level 1
112  if isTemporaryStream:
113  # Outputs created in certain workflows are read with older ROOT versions.
114  # E.g., temporary RDO files that are used in Run-2 simulation.
115  # For those, we have to use ZLIB
116  from AthenaConfiguration.Enums import LHCPeriod
117  if "RDO" in stream and hasattr(flags, "GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
118  tempFileCompressionSetting = (1,1) # ZLIB at level 1
119  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120  compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
121 
122  # See if the user asked for the AutoFlush to be overwritten
123  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
124 
125  # Print some debugging information
126  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
127 
128  # Set the Collection/Container prefixes (make configurable?)
129  outputCollection = "POOLContainer"
130  poolContainerPrefix = "CollectionTree"
131 
132  # Check to see if this stream is an augmentation
133  # Only set file-level attributes for the owning stream
134  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
135  if not isAugmentation:
136  # Set the Compression attributes
137  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
139 
140  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
141  if "AOD" in stream:
142  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
143  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
144  else:
145  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
146  # Set the master index
147  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
148 
149  # Set the Collection/Container prefixes
150  outputCollection += f"_{stream}"
151  poolContainerPrefix += f"_{stream}"
152  OutputMetadataContainers += [f"MetaData_{stream}"]
153 
154  # Set the AutoFlush attributes
155  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
158 
159  # Set the Spit Level attributes
160  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
161  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
162  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
163 
164  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
165  # This workaround is needed so that older releases can read files created by the new ones
166  # For more information see ATEAM-1001
167  if "EVNT" in stream or "RDO" in stream:
168  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
169  # also for compatibility with rel21 disable single DataHeaderForm
170  oneDHForm = False
171 
172  # Find the maximum AutoFlush across all formats
173  maxAutoFlush = max(maxAutoFlush, autoFlush)
174 
175  # If no EventData technology is set for this specific file
176  # (or globally) use flags.PoolSvc.DefaultContainerType
177  if fileName not in storageTechnologyMap and '*' not in storageTechnologyMap:
178  storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
179 
180  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
181  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
182  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
183  if useParallelCompression:
184  # Now compute the total number of events this job will process
185  requestedEvents = flags.Exec.MaxEvents
186  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
187  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
188  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
189  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
190  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
191  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
192  useParallelCompression = False
193 
194  if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
195  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
196  return AthenaPoolSharedIOCnvSvcCfg(flags,
197  PoolAttributes=PoolAttributes,
198  ParallelCompression=useParallelCompression,
199  StorageTechnology=storageTechnologyMap,
200  OutputMetadataContainers=OutputMetadataContainers,
201  OneDataHeaderForm = oneDHForm)
202  else:
203  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
204  return AthenaPoolCnvSvcCfg(flags,
205  PoolAttributes=PoolAttributes,
206  StorageTechnology=storageTechnologyMap,
207  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolSharedIOCnvSvcCfg
def AthenaPoolSharedIOCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:43
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23