ATLAS Offline Software
Loading...
Searching...
No Matches
CreateOutputStreams.py
Go to the documentation of this file.
1# Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2
3
7
8
9from AthenaCommon import CfgMgr
10from AthenaCommon.AppMgr import ServiceMgr as svcMgr
11from AthenaServices.AthenaServicesConf import AthenaOutputStream
12from AthenaServices.AthenaServicesConf import AthenaOutputStreamTool
13
14_trigNavThinningSvcs = {}
15def registerTrigNavThinningSvc (streamName, svc):
16 _trigNavThinningSvcs[streamName] = svc
17 return
18
19def createOutputStream( streamName, fileName = "", asAlg = False, noTag = False,
20 eventInfoKey = "EventInfo", decisionFilter="",
21 trigNavThinningSvc = None ):
22 if trigNavThinningSvc is None:
23 trigNavThinningSvc = _trigNavThinningSvcs.get (streamName, None)
24
25
26 # define athena output stream
27 writingTool = AthenaOutputStreamTool( streamName + "Tool",
28 TopLevelContainerName = "",
29 SubLevelBranchName = "<type>/<key>")
30 outputStream = AthenaOutputStream(
31 streamName,
32 WritingTool = writingTool,
33 ItemList = [ "EventInfo#*" ]
34 )
35 outputStream.ExtraOutputs.add(("DataHeader", "StoreGateSvc+" + streamName))
36 #outputStream.ItemList += [ "xAOD::EventInfo#*" ]
37 outputStream.MetadataStore = svcMgr.MetaDataStore
38 outputStream.MetadataItemList = [
39 "EventStreamInfo#" + streamName,
40 "IOVMetaDataContainer#*",
41 ]
42
43
44 from AthenaCommon.AlgSequence import AlgSequence
45 topSequence = AlgSequence()
46 from AthenaCommon.AlgSequence import AthSequencer
47 outSequence = AthSequencer("AthOutSeq")
48
49 doTag = not noTag
50 if doTag:
51 key = "SimpleTag"
52 # Tell tool to pick it up
53 outputStream.WritingTool.AttributeListKey=key
54 if ('EventInfoTagBuilder/EventInfoTagBuilder' not in topSequence.getProperties()['Members']):
55 # build eventinfo attribute list
56 from .OutputStreamAthenaPoolConf import EventInfoAttListTool, EventInfoTagBuilder
57 EventInfoTagBuilder = EventInfoTagBuilder(AttributeList=key, EventInfoKey=eventInfoKey, FilterString=decisionFilter,
59 from AthenaCommon.GlobalFlags import globalflags
60 if globalflags.InputFormat() == 'bytestream':
61 #No event-tag input in bytestream
62 EventInfoTagBuilder.PropagateInput=False
63 topSequence += EventInfoTagBuilder
64
65 # decide where to put outputstream in sequencing
66 if asAlg:
67 outSequence += outputStream
68 else:
69 outSequence += outputStream
70
71 if fileName != "":
72 outputStream.OutputFile = fileName
73 from .OutputStreamAthenaPoolConf import MakeEventStreamInfo
74 streamInfoTool = MakeEventStreamInfo( streamName + "_MakeEventStreamInfo" )
75 streamInfoTool.Key = streamName
76 streamInfoTool.EventInfoKey = eventInfoKey
77 # for xAOD access, add EventFormat to all POOL output streams
78 # Key to use for event format on this stream
79 event_format_key = 'EventFormat{}'.format(streamName)
80 event_format_tool = CfgMgr.xAODMaker__EventFormatStreamHelperTool(
81 "{}_MakeEventFormat".format(streamName),
82 Key=event_format_key,
83 )
84 outputStream.MetadataItemList += ["xAOD::EventFormat#{}".format(event_format_key)]
85
86 # Create a new xAOD::FileMetaData object
87 file_metadata_key = "FileMetaData"
88 file_metadata_creator_tool = CfgMgr.xAODMaker__FileMetaDataCreatorTool(
89 "FileMetaDataCreatorTool",
90 OutputKey=file_metadata_key,
91 StreamName=streamName,
92 )
93 outputStream.MetadataItemList += [
94 "xAOD::FileMetaData#{}".format(file_metadata_key),
95 "xAOD::FileMetaDataAuxInfo#{}Aux.".format(file_metadata_key),
96 ]
97
98 outputStream.HelperTools = [
99 streamInfoTool,
100 event_format_tool,
101 file_metadata_creator_tool,
102 ]
103
104
105 # Support for MT thinning.
106 from AthenaServices.AthenaServicesConf import Athena__ThinningCacheTool
107 tct = Athena__ThinningCacheTool ('ThinningCacheTool_' + streamName,
108 StreamName = streamName)
109 if trigNavThinningSvc is not None:
110 tct.TrigNavigationThinningSvc = trigNavThinningSvc
111 outputStream.HelperTools += [tct]
112
113
114 # Set the list of transient items based on what we know is in the transient
115 # store. The output algorithm will then declare input dependencies
116 # for objects which are both listed here and in the ItemList.
117 # (We do it like this because ItemList is typically configured to include
118 # everything which might possibly be output. If this gets cleaned up,
119 # then we can remove this.)
120 # Some builds don't include RecExConfig, so don't crash in that case.
121 # FIXME: Rather than using ObjKeyStore, we could scan all algorithms
122 # and look for write handles.
123 try:
124 tlist = []
125 from RecExConfig.ObjKeyStore import objKeyStore
126 for typ, klist in objKeyStore['transient'].getProperties().items():
127 for k in klist:
128 tlist.append (typ + '#' + k)
129 outputStream.TransientItems += tlist
130 except ImportError:
131 pass
132
133 return outputStream
134
135def createOutputConditionStream( streamName, fileName = "" ):
136 from RegistrationServices.OutputConditionsAlg import OutputConditionsAlg
137 conditionStream = OutputConditionsAlg(
138 streamName,
139 outputFile = fileName,
140 WriteIOV = False
141 )
142 return conditionStream
143
144
145AthenaPoolOutputStream = createOutputStream
146AthenaPoolOutputConditionStream = createOutputConditionStream
147
ClassName: AthSequencer.
This is the implementation of IAthenaOutputStreamTool.
algorithm that marks for write data objects in SG
This class provides an algorithm to make the EventStreamInfo object and update it.
registerTrigNavThinningSvc(streamName, svc)
createOutputStream(streamName, fileName="", asAlg=False, noTag=False, eventInfoKey="EventInfo", decisionFilter="", trigNavThinningSvc=None)
createOutputConditionStream(streamName, fileName="")