ATLAS Offline Software
Functions
python.PoolWriteConfig Namespace Reference

Functions

def _overrideTreeAutoFlush (logger, flags, stream, value)
 
def _getStreamsFromFlags (flags)
 
def PoolWriteCfg (flags)
 

Detailed Description

Configuration for POOL file writing

Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration

Function Documentation

◆ _getStreamsFromFlags()

def python.PoolWriteConfig._getStreamsFromFlags (   flags)
private
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
    + Output.{STREAM}FileName
    + Output.doWrite{STREAM}

Definition at line 23 of file PoolWriteConfig.py.

23 def _getStreamsFromFlags(flags):
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache

◆ _overrideTreeAutoFlush()

def python.PoolWriteConfig._overrideTreeAutoFlush (   logger,
  flags,
  stream,
  value 
)
private
Helper function to override TreeAutoFlush from flags.

Definition at line 8 of file PoolWriteConfig.py.

8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 

◆ PoolWriteCfg()

def python.PoolWriteConfig.PoolWriteCfg (   flags)
Return ComponentAccumulator configured to Write POOL files

Definition at line 41 of file PoolWriteConfig.py.

41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  oneDHForm = flags.Output.OneDataHeaderForm
63 
64  # Kept in sync with RecoUtils.py
65  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
66 
67  # Defaults for common formats
68  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
69  defaults = {
70  "EVNT" : [2, 1, 500, 0, 0],
71  "EVNT_TR" : [2, 1, 1, 0, 0],
72  "HITS" : [2, 1, 10, 0, 0],
73  "RDO" : [2, 1, 10, 0, 0],
74  "ESD" : [2, 1, 10, 0, 0],
75  "AOD" : [2, 1, 100, 0, 0],
76  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
77  "DAOD_PHYS" : [5, 5, 500, 0, 1],
78  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
79  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
80  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
81  }
82 
83  # Metadata containers needed for augmentations
84  OutputMetadataContainers = []
85 
86  # Loop over all streams and set the appropriate attributes
87  maxAutoFlush = -1
88  for stream in _getStreamsFromFlags(flags):
89 
90  # Get the file name - Guaranteed to exist at this point
91  fileName = getattr(flags.Output, f"{stream}FileName")
92 
93  # Get the ROOT settings to be applied
94  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
95  if stream in defaults:
96  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
97  elif "DAOD" in stream:
98  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
99  elif "D2AOD" in stream:
100  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
101 
102  # For temporary streams/files we always use ZLIB for the compression algorithm to save CPU cycles
103  # Temporary in this context might mean one of three things:
104  # a) Outputs of intermediate steps of chained workflows (file name begins with tmp.),
105  # b) Outputs of workers in AthenaMP jobs that are to be merged (file name ends with _000), and
106  # c) Any output stream that is marked by the user as being temporary (via the CA flag Output.TemporaryStreams)
107  # The ultimate goal is to reconcile all three cases and propagate the information between the job transform
108  # and the job configuration (CA) so that we don't need to rely on the file names here...
109  isTemporaryStream = fileName.endswith('_000') or fileName.startswith('tmp.') or stream in flags.Output.TemporaryStreams
110  if isTemporaryStream:
111  logger.info(f"Stream {stream} is marked as temporary, overwriting the compression settings to 101")
112  compAlg, compLvl = (1, 1) if isTemporaryStream else (compAlg, compLvl)
113 
114  # See if the user asked for the AutoFlush to be overwritten
115  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
116 
117  # Print some debugging information
118  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
119 
120  # Set the Collection/Container prefixes (make configurable?)
121  outputCollection = "POOLContainer"
122  poolContainerPrefix = "CollectionTree"
123 
124  # Check to see if this stream is an augmentation
125  # Only set file-level attributes for the owning stream
126  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
127  if not isAugmentation:
128  # Set the Compression attributes
129  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
130  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
131 
132  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
133  if "AOD" in stream:
134  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
135  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
136  else:
137  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
138  # Set the master index
139  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
140 
141  # Set the Collection/Container prefixes
142  outputCollection += f"_{stream}"
143  poolContainerPrefix += f"_{stream}"
144  OutputMetadataContainers += [f"MetaData_{stream}"]
145 
146  # Set the AutoFlush attributes
147  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
148  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
149  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
150 
151  # Set the Spit Level attributes
152  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
153  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
154  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
155 
156  # ROOT inadvertently broke forward compatibility in v6.30+ (see root/issues/15964)
157  # This workaround is needed so that older releases can read files created by the new ones
158  # For more information see ATEAM-1001
159  if "EVNT" in stream or "RDO" in stream:
160  PoolAttributes += [ f"DatabaseName = '{fileName}'; FILEFORWARD_COMPATIBILITY = '1'" ]
161  # also for compatibility with rel21 disable single DataHeaderForm
162  oneDHForm = False
163 
164  # Find the maximum AutoFlush across all formats
165  maxAutoFlush = max(maxAutoFlush, autoFlush)
166 
167  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
168  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
169  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
170  if useParallelCompression:
171  # Now compute the total number of events this job will process
172  requestedEvents = flags.Exec.MaxEvents
173  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
174  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
175  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
176  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
177  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
178  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
179  useParallelCompression = False
180 
181  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
182  return AthenaPoolCnvSvcCfg(flags,
183  PoolAttributes=PoolAttributes,
184  ParallelCompression=useParallelCompression,
185  StorageTechnology=flags.Output.StorageTechnology.EventData,
186  OutputMetadataContainers=OutputMetadataContainers,
187  OneDataHeaderForm = oneDHForm)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:29
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23