ATLAS Offline Software
Functions
python.PoolWriteConfig Namespace Reference

Functions

def _overrideTreeAutoFlush (logger, flags, stream, value)
 
def _getStreamsFromFlags (flags)
 
def PoolWriteCfg (flags)
 

Detailed Description

Configuration for POOL file writing

Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration

Function Documentation

◆ _getStreamsFromFlags()

def python.PoolWriteConfig._getStreamsFromFlags (   flags)
private
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
    + Output.{STREAM}FileName
    + Output.doWrite{STREAM}

Definition at line 23 of file PoolWriteConfig.py.

23 def _getStreamsFromFlags(flags):
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache

◆ _overrideTreeAutoFlush()

def python.PoolWriteConfig._overrideTreeAutoFlush (   logger,
  flags,
  stream,
  value 
)
private
Helper function to override TreeAutoFlush from flags.

Definition at line 8 of file PoolWriteConfig.py.

8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 

◆ PoolWriteCfg()

def python.PoolWriteConfig.PoolWriteCfg (   flags)
Return ComponentAccumulator configured to Write POOL files

Definition at line 41 of file PoolWriteConfig.py.

41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  storageTechnologyMap = flags.Output.StorageTechnology.EventData or {'*': flags.PoolSvc.DefaultContainerType}
89  for stream in _getStreamsFromFlags(flags):
90 
91  # Get the file name - Guaranteed to exist at this point
92  fileName = getattr(flags.Output, f"{stream}FileName")
93 
94  # Get the ROOT settings to be applied
95  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
96  if stream in defaults:
97  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
98  elif "DAOD" in stream:
99  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
100  elif "D2AOD" in stream:
101  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
102 
103  # For temporary streams/files we use either ZLIB or ZSTD for the compression algorithm to save CPU cycles
104  # Temporary in this context might mean one of three things:
105  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
106  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
107  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
108  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
109  # and the job configuration (CA) so that we don't need to rely on the file names here...
110  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
111  tempFileCompressionSetting = (5,1) # ZSTD at level 1
112  if isTemporaryStream:
113  # Outputs created in certain workflows are read with older ROOT versions.
114  # E.g., temporary RDO files that are used in Run-2 simulation.
115  # For those, we have to use ZLIB
116  from AthenaConfiguration.Enums import LHCPeriod
117  if "RDO" in stream and hasattr(flags, "GeoModel") and flags.GeoModel.Run < LHCPeriod.Run3:
118  tempFileCompressionSetting = (1,1) # ZLIB at level 1
119  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to {tempFileCompressionSetting}")
120  compAlg, compLvl = tempFileCompressionSetting if isTemporaryStream else (compAlg, compLvl)
121 
122  # See if the user asked for the AutoFlush to be overwritten
123  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
124 
125  # Print some debugging information
126  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
127 
128  # Set the Collection/Container prefixes (make configurable?)
129  outputCollection = "POOLContainer"
130  poolContainerPrefix = "CollectionTree"
131 
132  # Check to see if this stream is an augmentation
133  # Only set file-level attributes for the owning stream
134  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
135  if not isAugmentation:
136  # Set the Compression attributes
137  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
138  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
139 
140  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
141  if "AOD" in stream:
142  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
143  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
144  else:
145  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
146  # Set the master index
147  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
148 
149  # Set the Collection/Container prefixes
150  outputCollection += f"_{stream}"
151  poolContainerPrefix += f"_{stream}"
152  OutputMetadataContainers += [f"MetaData_{stream}"]
153 
154  # Set the AutoFlush attributes
155  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
156  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
157  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
158 
159  # Set the Spit Level attributes
160  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
161  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
162  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
163 
164  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
165  # This workaround is needed so that older releases can read files created by the new ones
166  # For more information see ATEAM-1001
167  if "EVNT" in stream or "RDO" in stream:
168  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
169  # also for compatibility with rel21 disable single DataHeaderForm
170  oneDHForm = False
171 
172  # Find the maximum AutoFlush across all formats
173  maxAutoFlush = max(maxAutoFlush, autoFlush)
174 
175  # If no EventData technology is set for this specific file
176  # (or globally) use flags.PoolSvc.DefaultContainerType
177  if fileName not in storageTechnologyMap and '*' not in storageTechnologyMap:
178  storageTechnologyMap[fileName] = flags.PoolSvc.DefaultContainerType
179 
180  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
181  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
182  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
183  if useParallelCompression:
184  # Now compute the total number of events this job will process
185  requestedEvents = flags.Exec.MaxEvents
186  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
187  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
188  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
189  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
190  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
191  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
192  useParallelCompression = False
193 
194  if flags.MP.UseSharedReader or flags.MP.UseSharedWriter:
195  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
196  return AthenaPoolSharedIOCnvSvcCfg(flags,
197  PoolAttributes=PoolAttributes,
198  ParallelCompression=useParallelCompression,
199  StorageTechnology=storageTechnologyMap,
200  OutputMetadataContainers=OutputMetadataContainers,
201  OneDataHeaderForm = oneDHForm)
202  else:
203  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
204  return AthenaPoolCnvSvcCfg(flags,
205  PoolAttributes=PoolAttributes,
206  StorageTechnology=storageTechnologyMap,
207  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolSharedIOCnvSvcCfg
def AthenaPoolSharedIOCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:43
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23