ATLAS Offline Software
Functions
python.OutputStreamConfig Namespace Reference

Functions

def outputStreamName (streamName)
 
def OutputStreamCfg (flags, streamName, ItemList=None, MetadataItemList=None, disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=None, HelperTools=None)
 
def addToESD (flags, itemOrList, **kwargs)
 
def addToAOD (flags, itemOrList, **kwargs)
 
def addToMetaData (flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
 

Function Documentation

◆ addToAOD()

def python.OutputStreamConfig.addToAOD (   flags,
  itemOrList,
**  kwargs 
)
Adds items to AOD stream

@see add addToESD

Definition at line 202 of file OutputStreamConfig.py.

202 def addToAOD(flags, itemOrList, **kwargs):
203  """
204  Adds items to AOD stream
205 
206  @see add addToESD
207  """
208  if not flags.Output.doWriteAOD:
209  return ComponentAccumulator()
210  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
211  return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
212 

◆ addToESD()

def python.OutputStreamConfig.addToESD (   flags,
  itemOrList,
**  kwargs 
)
Adds items to ESD stream

The argument can be either list of items or just one item
if further customisations are needed for output stream they can be passed via kwargs

returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))

Definition at line 187 of file OutputStreamConfig.py.

187 def addToESD(flags, itemOrList, **kwargs):
188  """
189  Adds items to ESD stream
190 
191  The argument can be either list of items or just one item
192  if further customisations are needed for output stream they can be passed via kwargs
193 
194  returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
195  """
196  if not flags.Output.doWriteESD:
197  return ComponentAccumulator()
198  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
199  return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
200 
201 

◆ addToMetaData()

def python.OutputStreamConfig.addToMetaData (   flags,
  streamName,
  itemOrList,
  AcceptAlgs = [],
  HelperTools = [],
**  kwargs 
)
Adds Metadata items to the stream named streamName

Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
The former is needed when there are special kernels, e.g., simulation/derivation
The latter is needed primarily for the propagation of the FileMetaData tool

Returns CA to be merged

Definition at line 213 of file OutputStreamConfig.py.

213 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
214  """
215  Adds Metadata items to the stream named streamName
216 
217  Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
218  The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
219  The former is needed when there are special kernels, e.g., simulation/derivation
220  The latter is needed primarily for the propagation of the FileMetaData tool
221 
222  Returns CA to be merged
223  """
224  flagName = f"Output.doWrite{streamName}"
225  if not flags.hasFlag(flagName):
226  return ComponentAccumulator()
227  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
228  return OutputStreamCfg(flags, streamName, MetadataItemList=items,
229  AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)

◆ OutputStreamCfg()

def python.OutputStreamConfig.OutputStreamCfg (   flags,
  streamName,
  ItemList = None,
  MetadataItemList = None,
  disableEventTag = False,
  trigNavThinningSvc = None,
  takeItemsFromInput = False,
  extendProvenanceRecord = True,
  keepProvenanceTagsRegEx = None,
  AcceptAlgs = None,
  HelperTools = None 
)
Configure an output stream for writing data to POOL files.

Args:
    flags: Configuration flags object
    streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
    ItemList: List of data objects to write to the stream
    MetadataItemList: List of metadata objects to write
    disableEventTag: If True, disable event tagging
    trigNavThinningSvc: Trigger navigation thinning service
    takeItemsFromInput: If True, take items from input file
    extendProvenanceRecord: If True, extend provenance record with processing tags
    keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
                            Only matching tags will be copied to the new DataHeader.
                            Empty string rejects all tags. Direct provenance is not affected
                            (see extendProvenanceRecord).
    AcceptAlgs: List of algorithms that must accept the event for it to be written
    HelperTools: List of helper tools to attach to the stream

Returns:
    ComponentAccumulator: Configured output stream and associated services

Definition at line 13 of file OutputStreamConfig.py.

13 def OutputStreamCfg(
14  flags,
15  streamName,
16  ItemList=None,
17  MetadataItemList=None,
18  disableEventTag=False,
19  trigNavThinningSvc=None,
20  takeItemsFromInput=False,
21  extendProvenanceRecord=True,
22  keepProvenanceTagsRegEx=None,
23  AcceptAlgs=None,
24  HelperTools=None,
25 ):
26  """Configure an output stream for writing data to POOL files.
27 
28  Args:
29  flags: Configuration flags object
30  streamName: Name of the output stream (e.g., 'ESD', 'AOD', 'DAOD_PHYS')
31  ItemList: List of data objects to write to the stream
32  MetadataItemList: List of metadata objects to write
33  disableEventTag: If True, disable event tagging
34  trigNavThinningSvc: Trigger navigation thinning service
35  takeItemsFromInput: If True, take items from input file
36  extendProvenanceRecord: If True, extend provenance record with processing tags
37  keepProvenanceTagsRegEx: RegEx string to match processing tags in the Event provenance.
38  Only matching tags will be copied to the new DataHeader.
39  Empty string rejects all tags. Direct provenance is not affected
40  (see extendProvenanceRecord).
41  AcceptAlgs: List of algorithms that must accept the event for it to be written
42  HelperTools: List of helper tools to attach to the stream
43 
44  Returns:
45  ComponentAccumulator: Configured output stream and associated services
46  """
47  # Handle mutable default arguments
48  if ItemList is None:
49  ItemList = []
50  if MetadataItemList is None:
51  MetadataItemList = []
52  if AcceptAlgs is None:
53  AcceptAlgs = []
54  if HelperTools is None:
55  HelperTools = []
56 
57  eventInfoKey = "EventInfo"
58  if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
59  eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
60 
61  msg = logging.getLogger("OutputStreamCfg")
62  flagName = f"Output.{streamName}FileName"
63  if flags.hasFlag(flagName):
64  fileName = flags._get(flagName)
65  else:
66  fileName = f"my{streamName}.pool.root"
67  msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
68 
69  if fileName in flags.Input.Files:
70  raise ConfigurationError("Same name for input and output file %s" % fileName)
71 
72  result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
73 
74  # Set up AthenaPoolCnvSvc through PoolWriteCfg
75  from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
76  result.merge(PoolWriteCfg(flags))
77 
78  # Extract the metadata storage technology for this file
79  # First try exact filename match, then wildcard
80  # If not found, default to the same technology as EventData for this file
81  metaDataTech = flags.Output.StorageTechnology.MetaData.get(fileName)
82  if metaDataTech is None:
83  metaDataTech = flags.Output.StorageTechnology.MetaData.get('*')
84  if metaDataTech is None:
85  # Fall back to EventData technology for this file
86  eventDataTech = flags.Output.StorageTechnology.EventData.get(
87  fileName,
88  flags.Output.StorageTechnology.EventData.get('*', 'ROOTTREEINDEX')
89  )
90  metaDataTech = eventDataTech
91 
92  # define athena output stream
93  writingTool = CompFactory.AthenaOutputStreamTool(
94  f"{outputStreamName(streamName)}Tool",
95  DataHeaderKey=outputStreamName(streamName),
96  MetaDataPoolContainerPrefix=f"{metaDataTech}:MetaData",
97  MetaDataOutputCollection=f"{metaDataTech}:MetaDataHdr",
98  ConversionService="AthenaPoolSharedIOCnvSvc" if flags.MP.UseSharedReader or flags.MP.UseSharedWriter else "AthenaPoolCnvSvc",
99  )
100 
101  # If we're running in augmentation mode, configure the writing tool accordingly
102  parentStream = f"Output.{streamName}ParentStream"
103  childStream = f"Output.{streamName}ChildStream"
104  if flags.hasFlag(childStream):
105  writingTool.SaveDecisions = True
106  elif flags.hasFlag(parentStream):
107  disableEventTag = True
108  writingTool.OutputCollection = f"POOLContainer_{streamName}"
109  writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
110  writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
111  writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
112  msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
113 
114  # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
115  # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
116  finalItemList = []
117  if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
118  finalItemList = ItemList
119  else:
120  finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
121 
122  outputStream = CompFactory.AthenaOutputStream(
123  outputStreamName(streamName),
124  StreamName=outputStreamName(streamName),
125  WritingTool=writingTool,
126  ItemList=finalItemList,
127  MetadataItemList=MetadataItemList,
128  OutputFile=fileName,
129  HelperTools=HelperTools,
130  )
131  if takeItemsFromInput:
132  # Avoid explicitly setting the property unless we want to set it
133  # to True (default in C++ is False). This avoids CA merge
134  # conflicts.
135  outputStream.TakeItemsFromInput = True
136  if not extendProvenanceRecord:
137  # Treat this similar to takeItemsFromInput
138  # (C++ default in this case is True)
139  outputStream.ExtendProvenanceRecord = False
140  if keepProvenanceTagsRegEx is not None:
141  # C++ defaults to '.*' which means all. Overwrite only on request.
142  outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
143  outputStream.AcceptAlgs += AcceptAlgs
144  outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
145  if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
146  outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
147  # Ignore dependencies
148  from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
149  result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
150 
151  result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
152  outputStream.MetadataStore = result.getService("MetaDataStore")
153 
154  # Support for MT thinning.
155  thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
156  StreamName=outputStreamName(streamName))
157  if trigNavThinningSvc is not None:
158  thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
159  outputStream.HelperTools.append(thinningCacheTool)
160 
161  # Event Tag
162  if not disableEventTag:
163  key = "SimpleTag"
164  outputStream.WritingTool.AttributeListKey=key
165 
166  propagateInputAttributeList = False
167  if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
168  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
169  result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
170  propagateInputAttributeList = True
171 
172  # build eventinfo attribute list
173  tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
174  Tool=CompFactory.EventInfoAttListTool(),
175  EventInfoKey=eventInfoKey,
176  PropagateInput=propagateInputAttributeList)
177  result.addEventAlgo(tagBuilder)
178 
179  # For xAOD output
180  if "AOD" in streamName:
181  outputStream.WritingTool.SubLevelBranchName = "<key>"
182 
183  result.addEventAlgo(outputStream, domain='IO')
184  return result
185 
186 

◆ outputStreamName()

def python.OutputStreamConfig.outputStreamName (   streamName)

Definition at line 9 of file OutputStreamConfig.py.

9 def outputStreamName(streamName):
10  return f"Stream{streamName}"
11 
12 
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=None, MetadataItemList=None, disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=None, HelperTools=None)
Definition: OutputStreamConfig.py:13
python.OutputStreamConfig.addToMetaData
def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
Definition: OutputStreamConfig.py:213
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.OutputStreamConfig.outputStreamName
def outputStreamName(streamName)
Definition: OutputStreamConfig.py:9
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:57
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.OutputStreamConfig.addToESD
def addToESD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:187
python.OutputStreamConfig.addToAOD
def addToAOD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:202