ATLAS Offline Software
PoolWriteConfig.py
Go to the documentation of this file.
1 """Configuration for POOL file writing
2 
3 Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
4 """
5 
6 from AthenaConfiguration.AccumulatorCache import AccumulatorCache
7 
8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache
41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  # Kept in sync with RecoUtils.py
63  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
64 
65  # Defaults for common formats
66  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
67  defaults = {
68  "EVNT_TR" : [2, 1, 1, 0, 0],
69  "HITS" : [2, 1, 10, 0, 0],
70  "RDO" : [2, 1, 10, 0, 0],
71  "ESD" : [2, 1, 10, 0, 0],
72  "AOD" : [2, 1, 100, 0, 0],
73  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
74  "DAOD_PHYS" : [5, 5, 500, 0, 1],
75  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
76  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
77  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
78  }
79 
80  # Metadata containers needed for augmentations
81  OutputMetadataContainers = []
82 
83  # Loop over all streams and set the appropriate attributes
84  maxAutoFlush = -1
85  for stream in _getStreamsFromFlags(flags):
86 
87  # Get the file name - Guaranteed to exist at this point
88  fileName = getattr(flags.Output, f"{stream}FileName")
89 
90  # Get the ROOT settings to be applied
91  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
92  if stream in defaults:
93  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
94  elif "DAOD" in stream:
95  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
96  elif "D2AOD" in stream:
97  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
98 
99  # For temporary files we always use ZLIB for compression algorithm
100  compAlg = 1 if fileName.endswith('_000') or fileName.startswith('tmp.') else compAlg
101 
102  # See if the user asked for the AutoFlush to be overwritten
103  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
104 
105  # Print some debugging information
106  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
107 
108  # Set the Collection/Container prefixes (make configurable?)
109  outputCollection = "POOLContainer"
110  poolContainerPrefix = "CollectionTree"
111 
112  # Check to see if this stream is an augmentation
113  # Only set file-level attributes for the owning stream
114  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
115  if not isAugmentation:
116  # Set the Compression attributes
117  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
118  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
119 
120  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
121  if "AOD" in stream:
122  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
123  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
124  else:
125  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
126  # Set the master index
127  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
128 
129  # Set the Collection/Container prefixes
130  outputCollection += f"_{stream}"
131  poolContainerPrefix += f"_{stream}"
132  OutputMetadataContainers += [f"MetaData_{stream}"]
133 
134  # Set the AutoFlush attributes
135  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
136  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
137  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
138 
139  # Set the Spit Level attributes
140  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
141  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
142  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
143 
144  # Find the maximum AutoFlush across all formats
145  maxAutoFlush = max(maxAutoFlush, autoFlush)
146 
147  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
148  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
149  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
150  if useParallelCompression:
151  # Now compute the total number of events this job will process
152  requestedEvents = flags.Exec.MaxEvents
153  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
154  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
155  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
156  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
157  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
158  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
159  useParallelCompression = False
160 
161  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
162  return AthenaPoolCnvSvcCfg(flags,
163  PoolAttributes=PoolAttributes,
164  ParallelCompression=useParallelCompression,
165  StorageTechnology=flags.Output.StorageTechnology.EventData,
166  OutputMetadataContainers=OutputMetadataContainers)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
#define max(a, b)
Definition: cfImp.cxx:41
min
#define min(a, b)
Definition: cfImp.cxx:40
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:29
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23