ATLAS Offline Software
Loading...
Searching...
No Matches
CreateOutputStreams.py
Go to the documentation of this file.
1# Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2
3
7
8
9from AthenaCommon import CfgMgr
10from AthenaCommon.AppMgr import ServiceMgr as svcMgr
11from AthenaServices.AthenaServicesConf import AthenaOutputStream
12from AthenaServices.AthenaServicesConf import AthenaOutputStreamTool
13
14_trigNavThinningSvcs = {}
15def registerTrigNavThinningSvc (streamName, svc):
16 _trigNavThinningSvcs[streamName] = svc
17 return
18
19def createOutputStream( streamName, fileName = "", asAlg = False, noTag = False,
20 eventInfoKey = "EventInfo", decisionFilter="",
21 trigNavThinningSvc = None ):
22 if trigNavThinningSvc is None:
23 trigNavThinningSvc = _trigNavThinningSvcs.get (streamName, None)
24
25
26 # define athena output stream
27 writingTool = AthenaOutputStreamTool( streamName + "Tool" )
28 outputStream = AthenaOutputStream(
29 streamName,
30 WritingTool = writingTool,
31 ItemList = [ "EventInfo#*" ]
32 )
33 outputStream.ExtraOutputs.add(("DataHeader", "StoreGateSvc+" + streamName))
34 #outputStream.ItemList += [ "xAOD::EventInfo#*" ]
35 outputStream.MetadataStore = svcMgr.MetaDataStore
36 outputStream.MetadataItemList = [
37 "EventStreamInfo#" + streamName,
38 "IOVMetaDataContainer#*",
39 ]
40
41
42 from AthenaCommon.AlgSequence import AlgSequence
43 topSequence = AlgSequence()
44 from AthenaCommon.AlgSequence import AthSequencer
45 outSequence = AthSequencer("AthOutSeq")
46
47 doTag = not noTag
48 if doTag:
49 key = "SimpleTag"
50 # Tell tool to pick it up
51 outputStream.WritingTool.AttributeListKey=key
52 if ('EventInfoTagBuilder/EventInfoTagBuilder' not in topSequence.getProperties()['Members']):
53 # build eventinfo attribute list
54 from .OutputStreamAthenaPoolConf import EventInfoAttListTool, EventInfoTagBuilder
55 EventInfoTagBuilder = EventInfoTagBuilder(AttributeList=key, EventInfoKey=eventInfoKey, FilterString=decisionFilter,
57 from AthenaCommon.GlobalFlags import globalflags
58 if globalflags.InputFormat() == 'bytestream':
59 #No event-tag input in bytestream
60 EventInfoTagBuilder.PropagateInput=False
61 topSequence += EventInfoTagBuilder
62
63 # decide where to put outputstream in sequencing
64 if asAlg:
65 outSequence += outputStream
66 else:
67 outSequence += outputStream
68
69 if fileName != "":
70 outputStream.OutputFile = fileName
71 from .OutputStreamAthenaPoolConf import MakeEventStreamInfo
72 streamInfoTool = MakeEventStreamInfo( streamName + "_MakeEventStreamInfo" )
73 streamInfoTool.Key = streamName
74 streamInfoTool.EventInfoKey = eventInfoKey
75 # for xAOD access, add EventFormat to all POOL output streams
76 # Key to use for event format on this stream
77 event_format_key = 'EventFormat{}'.format(streamName)
78 event_format_tool = CfgMgr.xAODMaker__EventFormatStreamHelperTool(
79 "{}_MakeEventFormat".format(streamName),
80 Key=event_format_key,
81 )
82 outputStream.MetadataItemList += ["xAOD::EventFormat#{}".format(event_format_key)]
83
84 # Create a new xAOD::FileMetaData object
85 file_metadata_key = "FileMetaData"
86 file_metadata_creator_tool = CfgMgr.xAODMaker__FileMetaDataCreatorTool(
87 "FileMetaDataCreatorTool",
88 OutputKey=file_metadata_key,
89 StreamName=streamName,
90 )
91 outputStream.MetadataItemList += [
92 "xAOD::FileMetaData#{}".format(file_metadata_key),
93 "xAOD::FileMetaDataAuxInfo#{}Aux.".format(file_metadata_key),
94 ]
95
96 outputStream.HelperTools = [
97 streamInfoTool,
98 event_format_tool,
99 file_metadata_creator_tool,
100 ]
101
102
103 # Support for MT thinning.
104 from AthenaServices.AthenaServicesConf import Athena__ThinningCacheTool
105 tct = Athena__ThinningCacheTool ('ThinningCacheTool_' + streamName,
106 StreamName = streamName)
107 if trigNavThinningSvc is not None:
108 tct.TrigNavigationThinningSvc = trigNavThinningSvc
109 outputStream.HelperTools += [tct]
110
111
112 # Set the list of transient items based on what we know is in the transient
113 # store. The output algorithm will then declare input dependencies
114 # for objects which are both listed here and in the ItemList.
115 # (We do it like this because ItemList is typically configured to include
116 # everything which might possibly be output. If this gets cleaned up,
117 # then we can remove this.)
118 # Some builds don't include RecExConfig, so don't crash in that case.
119 # FIXME: Rather than using ObjKeyStore, we could scan all algorithms
120 # and look for write handles.
121 try:
122 tlist = []
123 from RecExConfig.ObjKeyStore import objKeyStore
124 for typ, klist in objKeyStore['transient'].getProperties().items():
125 for k in klist:
126 tlist.append (typ + '#' + k)
127 outputStream.TransientItems += tlist
128 except ImportError:
129 pass
130
131 return outputStream
132
133def createOutputConditionStream( streamName, fileName = "" ):
134 from RegistrationServices.OutputConditionsAlg import OutputConditionsAlg
135 conditionStream = OutputConditionsAlg(
136 streamName,
137 outputFile = fileName,
138 WriteIOV = False
139 )
140 return conditionStream
141
142
143AthenaPoolOutputStream = createOutputStream
144AthenaPoolOutputConditionStream = createOutputConditionStream
145
ClassName: AthSequencer.
This is the implementation of IAthenaOutputStreamTool.
algorithm that marks for write data objects in SG
This class provides an algorithm to make the EventStreamInfo object and update it.
registerTrigNavThinningSvc(streamName, svc)
createOutputStream(streamName, fileName="", asAlg=False, noTag=False, eventInfoKey="EventInfo", decisionFilter="", trigNavThinningSvc=None)
createOutputConditionStream(streamName, fileName="")