ATLAS Offline Software
PoolWriteConfig.py
Go to the documentation of this file.
1 """Configuration for POOL file writing
2 
3 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4 """
5 
6 from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7 
8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache
41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  for stream in _getStreamsFromFlags(flags):
89 
90  # Get the file name - Guaranteed to exist at this point
91  fileName = getattr(flags.Output, f"{stream}FileName")
92 
93  # Get the ROOT settings to be applied
94  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
95  if stream in defaults:
96  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
97  elif "DAOD" in stream:
98  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
99  elif "D2AOD" in stream:
100  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
101 
102  # For temporary streams/files we always use ZLIB for the compression algorithm to save CPU cycles
103  # Temporary in this context might mean one of three things:
104  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
105  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
106  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
107  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
108  # and the job configuration (CA) so that we don't need to rely on the file names here...
109  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
110  if isTemporaryStream:
111  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to 101")
112  compAlg, compLvl = (1, 1) if isTemporaryStream else (compAlg, compLvl)
113 
114  # See if the user asked for the AutoFlush to be overwritten
115  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
116 
117  # Print some debugging information
118  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
119 
120  # Set the Collection/Container prefixes (make configurable?)
121  outputCollection = "POOLContainer"
122  poolContainerPrefix = "CollectionTree"
123 
124  # Check to see if this stream is an augmentation
125  # Only set file-level attributes for the owning stream
126  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
127  if not isAugmentation:
128  # Set the Compression attributes
129  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
130  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
131 
132  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
133  if "AOD" in stream:
134  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
135  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
136  else:
137  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
138  # Set the master index
139  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
140 
141  # Set the Collection/Container prefixes
142  outputCollection += f"_{stream}"
143  poolContainerPrefix += f"_{stream}"
144  OutputMetadataContainers += [f"MetaData_{stream}"]
145 
146  # Set the AutoFlush attributes
147  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
148  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
149  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
150 
151  # Set the Spit Level attributes
152  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
153  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
154  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
155 
156  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
157  # This workaround is needed so that older releases can read files created by the new ones
158  # For more information see ATEAM-1001
159  if "EVNT" in stream or "RDO" in stream:
160  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
161  # also for compatibility with rel21 disable single DataHeaderForm
162  oneDHForm = False
163 
164  # Find the maximum AutoFlush across all formats
165  maxAutoFlush = max(maxAutoFlush, autoFlush)
166 
167  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
168  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
169  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
170  if useParallelCompression:
171  # Now compute the total number of events this job will process
172  requestedEvents = flags.Exec.MaxEvents
173  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
174  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
175  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
176  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
177  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
178  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
179  useParallelCompression = False
180 
181  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
182  return AthenaPoolCnvSvcCfg(flags,
183  PoolAttributes=PoolAttributes,
184  ParallelCompression=useParallelCompression,
185  StorageTechnology=flags.Output.StorageTechnology.EventData,
186  OutputMetadataContainers=OutputMetadataContainers,
187  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:29
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23