ATLAS Offline Software
Functions
python.OutputStreamConfig Namespace Reference

Functions

def outputStreamName (streamName)
 
def OutputStreamCfg (flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[])
 
def addToESD (flags, itemOrList, **kwargs)
 
def addToAOD (flags, itemOrList, **kwargs)
 
def addToMetaData (flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
 

Function Documentation

◆ addToAOD()

def python.OutputStreamConfig.addToAOD (   flags,
  itemOrList,
**  kwargs 
)
Adds items to AOD stream

@see add addToESD

Definition at line 142 of file OutputStreamConfig.py.

142 def addToAOD(flags, itemOrList, **kwargs):
143  """
144  Adds items to AOD stream
145 
146  @see add addToESD
147  """
148  if not flags.Output.doWriteAOD:
149  return ComponentAccumulator()
150  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
151  return OutputStreamCfg(flags, "AOD", ItemList=items, **kwargs)
152 

◆ addToESD()

def python.OutputStreamConfig.addToESD (   flags,
  itemOrList,
**  kwargs 
)
Adds items to ESD stream

The argument can be either list of items or just one item
if further customisations are needed for output stream they can be passed via kwargs

returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))

Definition at line 127 of file OutputStreamConfig.py.

127 def addToESD(flags, itemOrList, **kwargs):
128  """
129  Adds items to ESD stream
130 
131  The argument can be either list of items or just one item
132  if further customisations are needed for output stream they can be passed via kwargs
133 
134  returns CA to be merged i.e.: result.merge(addToESD(flags, "xAOD::CoolObject"))
135  """
136  if not flags.Output.doWriteESD:
137  return ComponentAccumulator()
138  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
139  return OutputStreamCfg(flags, "ESD", ItemList=items, **kwargs)
140 
141 

◆ addToMetaData()

def python.OutputStreamConfig.addToMetaData (   flags,
  streamName,
  itemOrList,
  AcceptAlgs = [],
  HelperTools = [],
**  kwargs 
)
Adds Metadata items to the stream named streamName

Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
The former is needed when there are special kernels, e.g., simulation/derivation
The latter is needed primarily for the propagation of the FileMetaData tool

Returns CA to be merged

Definition at line 153 of file OutputStreamConfig.py.

153 def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs):
154  """
155  Adds Metadata items to the stream named streamName
156 
157  Similar to addToESD/AOD, itemOrList can be either a list of items or just one time
158  The additional arguments, AcceptAlgs and HelperTools, are passed to the underlying stream
159  The former is needed when there are special kernels, e.g., simulation/derivation
160  The latter is needed primarily for the propagation of the FileMetaData tool
161 
162  Returns CA to be merged
163  """
164  flagName = f"Output.doWrite{streamName}"
165  if not flags.hasFlag(flagName):
166  return ComponentAccumulator()
167  items = [itemOrList] if isinstance(itemOrList, str) else itemOrList
168  return OutputStreamCfg(flags, streamName, MetadataItemList=items,
169  AcceptAlgs=AcceptAlgs, HelperTools=HelperTools, **kwargs)

◆ OutputStreamCfg()

def python.OutputStreamConfig.OutputStreamCfg (   flags,
  streamName,
  ItemList = [],
  MetadataItemList = [],
  disableEventTag = False,
  trigNavThinningSvc = None,
  takeItemsFromInput = False,
  extendProvenanceRecord = True,
  AcceptAlgs = [],
  HelperTools = [] 
)

Definition at line 12 of file OutputStreamConfig.py.

12 def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
13  disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
14  extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[]):
15  eventInfoKey = "EventInfo"
16  if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking]:
17  eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
18 
19  msg = logging.getLogger("OutputStreamCfg")
20  flagName = f"Output.{streamName}FileName"
21  if flags.hasFlag(flagName):
22  fileName = flags._get(flagName)
23  else:
24  fileName = f"my{streamName}.pool.root"
25  msg.info("No file name predefined for stream %s. Using %s", streamName, fileName)
26 
27  if fileName in flags.Input.Files:
28  raise ConfigurationError("Same name for input and output file %s" % fileName)
29 
30  result = ComponentAccumulator(sequence = CompFactory.AthSequencer("AthOutSeq", StopOverride=True))
31 
32  # Set up AthenaPoolCnvSvc through PoolWriteCfg
33  from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
34  result.merge(PoolWriteCfg(flags))
35 
36  # define athena output stream
37  writingTool = CompFactory.AthenaOutputStreamTool(
38  f"{outputStreamName(streamName)}Tool",
39  DataHeaderKey=outputStreamName(streamName),
40  MetaDataPoolContainerPrefix=f"{flags.Output.StorageTechnology.MetaData}:MetaData",
41  MetaDataOutputCollection=f"{flags.Output.StorageTechnology.MetaData}:MetaDataHdr",
42  )
43 
44  # If we're running in augmentation mode, configure the writing tool accordingly
45  parentStream = f"Output.{streamName}ParentStream"
46  childStream = f"Output.{streamName}ChildStream"
47  if flags.hasFlag(childStream):
48  writingTool.SaveDecisions = True
49  elif flags.hasFlag(parentStream):
50  disableEventTag = True
51  writingTool.OutputCollection = f"POOLContainer_{streamName}"
52  writingTool.PoolContainerPrefix = f"CollectionTree_{streamName}"
53  writingTool.MetaDataOutputCollection = f"MetaDataHdr_{streamName}"
54  writingTool.MetaDataPoolContainerPrefix = f"MetaData_{streamName}"
55  msg.info("Stream %s running in augmentation mode with %s as parent", streamName, flags._get(parentStream))
56 
57  # In DAOD production the EventInfo is prepared specially by the SlimmingHelper to ensure it is written in AuxDyn form
58  # So for derivations the ItemList from the SlimmingHelper alone is used without the extra EventInfo items
59  finalItemList = []
60  if any(name in streamName for name in {"DAOD_", "D2AOD_"}):
61  finalItemList = ItemList
62  else:
63  finalItemList = [f"xAOD::EventInfo#{eventInfoKey}", f"xAOD::EventAuxInfo#{eventInfoKey}Aux."] + ItemList
64 
65  outputStream = CompFactory.AthenaOutputStream(
66  outputStreamName(streamName),
67  StreamName=outputStreamName(streamName),
68  WritingTool=writingTool,
69  ItemList=finalItemList,
70  MetadataItemList=MetadataItemList,
71  OutputFile=fileName,
72  HelperTools=HelperTools,
73  )
74  if takeItemsFromInput:
75  # Avoid explicitly setting the property unless we want to set it
76  # to True (default in C++ is False). This avoids CA merge
77  # conflicts.
78  outputStream.TakeItemsFromInput = True
79  if not extendProvenanceRecord:
80  # Treat this similar to takeItemsFromInput
81  # (C++ default in this case is True)
82  outputStream.ExtendProvenanceRecord = False
83  outputStream.AcceptAlgs += AcceptAlgs
84  outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
85  if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
86  outputStream.ExtraInputs = {tuple(l.split('#')) for l in finalItemList if '*' not in l and 'Aux' not in l}
87  # Ignore dependencies
88  from AthenaConfiguration.MainServicesConfig import OutputUsageIgnoreCfg
89  result.merge(OutputUsageIgnoreCfg(flags, outputStream.name))
90 
91  result.addService(CompFactory.StoreGateSvc("MetaDataStore"))
92  outputStream.MetadataStore = result.getService("MetaDataStore")
93 
94  # Support for MT thinning.
95  thinningCacheTool = CompFactory.Athena.ThinningCacheTool(f"ThinningCacheTool_Stream{streamName}",
96  StreamName=outputStreamName(streamName))
97  if trigNavThinningSvc is not None:
98  thinningCacheTool.TrigNavigationThinningSvc = trigNavThinningSvc
99  outputStream.HelperTools.append(thinningCacheTool)
100 
101  # Event Tag
102  if not disableEventTag:
103  key = "SimpleTag"
104  outputStream.WritingTool.AttributeListKey=key
105 
106  propagateInputAttributeList = False
107  if "AthenaAttributeList#Input" in flags.Input.TypedCollections:
108  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
109  result.merge(SGInputLoaderCfg(flags, ["AthenaAttributeList#Input"]))
110  propagateInputAttributeList = True
111 
112  # build eventinfo attribute list
113  tagBuilder = CompFactory.EventInfoTagBuilder(AttributeList=key,
114  Tool=CompFactory.EventInfoAttListTool(),
115  EventInfoKey=eventInfoKey,
116  PropagateInput=propagateInputAttributeList)
117  result.addEventAlgo(tagBuilder)
118 
119  # For xAOD output
120  if "AOD" in streamName:
121  outputStream.WritingTool.SubLevelBranchName = "<key>"
122 
123  result.addEventAlgo(outputStream, domain='IO')
124  return result
125 
126 

◆ outputStreamName()

def python.OutputStreamConfig.outputStreamName (   streamName)

Definition at line 9 of file OutputStreamConfig.py.

9 def outputStreamName(streamName):
10  return f"Stream{streamName}"
11 
python.OutputStreamConfig.addToMetaData
def addToMetaData(flags, streamName, itemOrList, AcceptAlgs=[], HelperTools=[], **kwargs)
Definition: OutputStreamConfig.py:153
python.PoolWriteConfig.PoolWriteCfg
def PoolWriteCfg(flags)
Definition: PoolWriteConfig.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.OutputStreamConfig.outputStreamName
def outputStreamName(streamName)
Definition: OutputStreamConfig.py:9
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:12
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:53
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.OutputStreamConfig.addToESD
def addToESD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:127
python.OutputStreamConfig.addToAOD
def addToAOD(flags, itemOrList, **kwargs)
Definition: OutputStreamConfig.py:142