ATLAS Offline Software
Functions
python.PoolWriteConfig Namespace Reference

Functions

def _overrideTreeAutoFlush (logger, flags, stream, value)
 
def _getStreamsFromFlags (flags)
 
def PoolWriteCfg (flags)
 

Detailed Description

Configuration for POOL file writing

Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration

Function Documentation

◆ _getStreamsFromFlags()

def python.PoolWriteConfig._getStreamsFromFlags (   flags)
private
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
    + Output.{STREAM}FileName
    + Output.doWrite{STREAM}

Definition at line 23 of file PoolWriteConfig.py.

23 def _getStreamsFromFlags(flags):
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache

◆ _overrideTreeAutoFlush()

def python.PoolWriteConfig._overrideTreeAutoFlush (   logger,
  flags,
  stream,
  value 
)
private
Helper function to override TreeAutoFlush from flags.

Definition at line 8 of file PoolWriteConfig.py.

8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 

◆ PoolWriteCfg()

def python.PoolWriteConfig.PoolWriteCfg (   flags)
Return ComponentAccumulator configured to Write POOL files

Definition at line 41 of file PoolWriteConfig.py.

41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  for stream in _getStreamsFromFlags(flags):
89 
90  # Get the file name - Guaranteed to exist at this point
91  fileName = getattr(flags.Output, f"{stream}FileName")
92 
93  # Get the ROOT settings to be applied
94  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
95  if stream in defaults:
96  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
97  elif "DAOD" in stream:
98  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
99  elif "D2AOD" in stream:
100  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
101 
102  # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
103  # Temporary in this context might mean one of three things:
104  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
105  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
106  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
107  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
108  # and the job configuration (CA) so that we don't need to rely on the file names here...
109  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
110  tempFileCompressionSetting = (5,1) # ZSTD at level 1
111  if isTemporaryStream:
112  # Outputs created in certain workflows are read with older ROOT versions.
113  # E.g., temporary RDO files that are used in Run-2 simulation.
114  # For those, we have to use ZLIB
115  from AthenaConfiguration.Enums import LHCPeriod
116  if "RDO" in stream and hasattr(flags, "GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
117  tempFileCompressionSetting = (1,1) # ZLIB at level 1
118  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
119  compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
120 
121  # See if the user asked for the AutoFlush to be overwritten
122  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
123 
124  # Print some debugging information
125  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
126 
127  # Set the Collection/Container prefixes (make configurable?)
128  outputCollection = "POOLContainer"
129  poolContainerPrefix = "CollectionTree"
130 
131  # Check to see if this stream is an augmentation
132  # Only set file-level attributes for the owning stream
133  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
134  if not isAugmentation:
135  # Set the Compression attributes
136  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
137  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
138 
139  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
140  if "AOD" in stream:
141  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
142  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
143  else:
144  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
145  # Set the master index
146  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
147 
148  # Set the Collection/Container prefixes
149  outputCollection += f"_{stream}"
150  poolContainerPrefix += f"_{stream}"
151  OutputMetadataContainers += [f"MetaData_{stream}"]
152 
153  # Set the AutoFlush attributes
154  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
155  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
156  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
157 
158  # Set the Spit Level attributes
159  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
160  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
161  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
162 
163  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
164  # This workaround is needed so that older releases can read files created by the new ones
165  # For more information see ATEAM-1001
166  if "EVNT" in stream or "RDO" in stream:
167  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
168  # also for compatibility with rel21 disable single DataHeaderForm
169  oneDHForm = False
170 
171  # Find the maximum AutoFlush across all formats
172  maxAutoFlush = max(maxAutoFlush, autoFlush)
173 
174  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
175  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
176  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
177  if useParallelCompression:
178  # Now compute the total number of events this job will process
179  requestedEvents = flags.Exec.MaxEvents
180  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
181  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
182  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
183  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
184  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
185  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
186  useParallelCompression = False
187 
188  if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
189  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
190  return AthenaPoolSharedIOCnvSvcCfg(flags,
191  PoolAttributes=PoolAttributes,
192  ParallelCompression=useParallelCompression,
193  StorageTechnology=flags.Output.StorageTechnology.EventData,
194  OutputMetadataContainers=OutputMetadataContainers,
195  OneDataHeaderForm = oneDHForm)
196  else:
197  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
198  return AthenaPoolCnvSvcCfg(flags,
199  PoolAttributes=PoolAttributes,
200  StorageTechnology=flags.Output.StorageTechnology.EventData,
201  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolSharedIOCnvSvcCfg
def AthenaPoolSharedIOCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:43
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23