Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
PoolWriteConfig.py
Go to the documentation of this file.
1 """Configuration for POOL file writing
2 
3 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4 """
5 
6 from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7 
8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache
41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  for stream in _getStreamsFromFlags(flags):
89 
90  # Get the file name - Guaranteed to exist at this point
91  fileName = getattr(flags.Output, f"{stream}FileName")
92 
93  # Get the ROOT settings to be applied
94  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
95  if stream in defaults:
96  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
97  elif "DAOD" in stream:
98  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
99  elif "D2AOD" in stream:
100  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
101 
102  # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
103  # Temporary in this context might mean one of three things:
104  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
105  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
106  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
107  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
108  # and the job configuration (CA) so that we don't need to rely on the file names here...
109  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
110  tempFileCompressionSetting = (5,1) # ZSTD at level 1
111  if isTemporaryStream:
112  # Outputs created in certain workflows are read with older ROOT versions.
113  # E.g., temporary RDO files that are used in Run-2 simulation.
114  # For those, we have to use ZLIB
115  from AthenaConfiguration.Enums import LHCPeriod
116  if "RDO" in stream and hasattr(flags, "GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
117  tempFileCompressionSetting = (1,1) # ZLIB at level 1
118  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
119  compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
120 
121  # See if the user asked for the AutoFlush to be overwritten
122  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
123 
124  # Print some debugging information
125  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
126 
127  # Set the Collection/Container prefixes (make configurable?)
128  outputCollection = "POOLContainer"
129  poolContainerPrefix = "CollectionTree"
130 
131  # Check to see if this stream is an augmentation
132  # Only set file-level attributes for the owning stream
133  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
134  if not isAugmentation:
135  # Set the Compression attributes
136  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
137  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
138 
139  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
140  if "AOD" in stream:
141  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
142  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
143  else:
144  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
145  # Set the master index
146  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
147 
148  # Set the Collection/Container prefixes
149  outputCollection += f"_{stream}"
150  poolContainerPrefix += f"_{stream}"
151  OutputMetadataContainers += [f"MetaData_{stream}"]
152 
153  # Set the AutoFlush attributes
154  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
155  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
156  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
157 
158  # Set the Spit Level attributes
159  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
160  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
161  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
162 
163  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
164  # This workaround is needed so that older releases can read files created by the new ones
165  # For more information see ATEAM-1001
166  if "EVNT" in stream or "RDO" in stream:
167  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
168  # also for compatibility with rel21 disable single DataHeaderForm
169  oneDHForm = False
170 
171  # Find the maximum AutoFlush across all formats
172  maxAutoFlush = max(maxAutoFlush, autoFlush)
173 
174  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
175  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
176  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
177  if useParallelCompression:
178  # Now compute the total number of events this job will process
179  requestedEvents = flags.Exec.MaxEvents
180  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
181  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
182  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
183  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
184  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
185  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
186  useParallelCompression = False
187 
188  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
189  return AthenaPoolCnvSvcCfg(flags,
190  PoolAttributes=PoolAttributes,
191  ParallelCompression=useParallelCompression,
192  StorageTechnology=flags.Output.StorageTechnology.EventData,
193  OutputMetadataContainers=OutputMetadataContainers,
194  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23