ATLAS Offline Software
Functions
python.PoolWriteConfig Namespace Reference

Functions

def _overrideTreeAutoFlush (logger, flags, stream, value)
 
def _getStreamsFromFlags (flags)
 
def PoolWriteCfg (flags)
 

Detailed Description

Configuration for POOL file writing

Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration

Function Documentation

◆ _getStreamsFromFlags()

def python.PoolWriteConfig._getStreamsFromFlags (   flags)
private
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
    + Output.{STREAM}FileName
    + Output.doWrite{STREAM}

Definition at line 23 of file PoolWriteConfig.py.

23 def _getStreamsFromFlags(flags):
24  """
25  Helper to get all the streams from configuration flags
26  For each stream that's configured to be written out
27  we have two flags w/ the following convention:
28  + Output.{STREAM}FileName
29  + Output.doWrite{STREAM}
30  """
31  result = []
32  for key, value in flags._flagdict.items():
33  if key.startswith("Output.") and key.endswith("FileName") and value.get():
34  stream = key.removeprefix("Output.").removesuffix("FileName")
35  if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
36  result.append(stream)
37  return result
38 
39 
40 @AccumulatorCache

◆ _overrideTreeAutoFlush()

def python.PoolWriteConfig._overrideTreeAutoFlush (   logger,
  flags,
  stream,
  value 
)
private
Helper function to override TreeAutoFlush from flags.

Definition at line 8 of file PoolWriteConfig.py.

8 def _overrideTreeAutoFlush(logger, flags, stream, value):
9  """Helper function to override TreeAutoFlush from flags."""
10  if not flags.Output.TreeAutoFlush or not isinstance(flags.Output.TreeAutoFlush, dict):
11  return value
12 
13  if stream not in flags.Output.TreeAutoFlush:
14  return value
15 
16  override = flags.Output.TreeAutoFlush[stream]
17  if override is not None:
18  logger.info('Overriding TreeAutoFlush value for stream "%s" from %d to %d', stream, value, override)
19  return override
20 
21  return value
22 

◆ PoolWriteCfg()

def python.PoolWriteConfig.PoolWriteCfg (   flags)
Return ComponentAccumulator configured to Write POOL files

Definition at line 41 of file PoolWriteConfig.py.

41 def PoolWriteCfg(flags):
42  """Return ComponentAccumulator configured to Write POOL files"""
43  # based on WriteAthenaPool._configureWriteAthenaPool
44 
45  from AthenaCommon.Logging import logging
46  logger = logging.getLogger( 'PoolWriteCfg' )
47 
48  PoolAttributes = []
49  # Switch off splitting by setting default SplitLevel to 0
50  PoolAttributes += ["DEFAULT_SPLITLEVEL ='0'"]
51 
52  # Set as default the member-wise streaming, ROOT default
53  PoolAttributes += ["STREAM_MEMBER_WISE = '1'"]
54 
55  # Increase default BasketSize to 32K, ROOT default (but overwritten by POOL)
56  PoolAttributes += ["DEFAULT_BUFFERSIZE = '32000'"]
57 
58  # Set POOLContainerForm(DataHeaderForm) split level to 0
59  PoolAttributes += ["ContainerName = 'TTree=POOLContainerForm(DataHeaderForm)'; CONTAINER_SPLITLEVEL = '0'"]
60  PoolAttributes += ["TREE_BRANCH_OFFSETTAB_LEN ='100'"]
61 
62  # Kept in sync with RecoUtils.py
63  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
64 
65  # Defaults for common formats
66  # Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
67  defaults = {
68  "EVNT_TR" : [2, 1, 1, 0, 0],
69  "HITS" : [2, 1, 10, 0, 0],
70  "RDO" : [2, 1, 10, 0, 0],
71  "ESD" : [2, 1, 10, 0, 0],
72  "AOD" : [2, 1, 100, 0, 0],
73  "DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
74  "DAOD_PHYS" : [5, 5, 500, 0, 1],
75  "DAOD_PHYSLITE" : [5, 5, 500, 1, 1],
76  "DAOD_TRUTH3" : [5, 5, 500, 1, 1],
77  "D2AOD_PHYSLITE" : [5, 5, 500, 1, 1],
78  }
79 
80  # Metadata containers needed for augmentations
81  OutputMetadataContainers = []
82 
83  # Loop over all streams and set the appropriate attributes
84  maxAutoFlush = -1
85  for stream in _getStreamsFromFlags(flags):
86 
87  # Get the file name - Guaranteed to exist at this point
88  fileName = getattr(flags.Output, f"{stream}FileName")
89 
90  # Get the ROOT settings to be applied
91  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
92  if stream in defaults:
93  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
94  elif "DAOD" in stream:
95  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
96  elif "D2AOD" in stream:
97  compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 500, 1, 1 # Change the defaults for D2AODs
98 
99  # For temporary files we always use ZLIB for compression algorithm
100  compAlg = 1 if fileName.endswith('_000') or fileName.startswith('tmp.') else compAlg
101 
102  # See if the user asked for the AutoFlush to be overwritten
103  autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
104 
105  # Print some debugging information
106  logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
107 
108  # Set the Collection/Container prefixes (make configurable?)
109  outputCollection = "POOLContainer"
110  poolContainerPrefix = "CollectionTree"
111 
112  # Check to see if this stream is an augmentation
113  # Only set file-level attributes for the owning stream
114  isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
115  if not isAugmentation:
116  # Set the Compression attributes
117  PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
118  PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
119 
120  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
121  if "AOD" in stream:
122  PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
123  PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
124  else:
125  # Changes in this else block need to be coordinated w/ OutputStreamConfig!
126  # Set the master index
127  PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
128 
129  # Set the Collection/Container prefixes
130  outputCollection += f"_{stream}"
131  poolContainerPrefix += f"_{stream}"
132  OutputMetadataContainers += [f"MetaData_{stream}"]
133 
134  # Set the AutoFlush attributes
135  PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
136  PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
137  PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
138 
139  # Set the Spit Level attributes
140  PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
141  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
142  PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
143 
144  # Find the maximum AutoFlush across all formats
145  maxAutoFlush = max(maxAutoFlush, autoFlush)
146 
147  # If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
148  # In this context, "enough" means each worker has a chance to make at least one flush to the disk
149  useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
150  if useParallelCompression:
151  # Now compute the total number of events this job will process
152  requestedEvents = flags.Exec.MaxEvents
153  availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
154  totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
155  if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs > totalEntries ):
156  logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
157  logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
158  f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
159  useParallelCompression = False
160 
161  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
162  return AthenaPoolCnvSvcCfg(flags,
163  PoolAttributes=PoolAttributes,
164  ParallelCompression=useParallelCompression,
165  StorageTechnology=flags.Output.StorageTechnology.EventData,
166  OutputMetadataContainers=OutputMetadataContainers)
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
max
#define max(a, b)
Definition: cfImp.cxx:41
min
#define min(a, b)
Definition: cfImp.cxx:40
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:29
python.PoolWriteConfig._overrideTreeAutoFlush
def _overrideTreeAutoFlush(logger, flags, stream, value)
Definition: PoolWriteConfig.py:8
python.PoolWriteConfig._getStreamsFromFlags
def _getStreamsFromFlags(flags)
Definition: PoolWriteConfig.py:23