ATLAS Offline Software
Loading...
Searching...
No Matches
DataModelTestConfig.py
Go to the documentation of this file.
2# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration.
3#
4#
5# File: DataModelRunTests/python/DataModelTestConfig.py
6# Author: snyder@bnl.gov
7# Date: Nov 2023
8# Purpose: Helpers for configuration tests.
9#
10
11from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
12from AthenaConfiguration.AllConfigFlags import initConfigFlags
13from AthenaConfiguration.ComponentFactory import CompFactory
14from AthenaPython.PyAthenaComps import Alg, StatusCode
15from AthenaCommon.Constants import INFO
16
17
18#
19# Common configuration flag settings.
20# Takes an optional input file name and event count.
21# Remaining keyword argument names are interpreted as stream names. Example:
22# flags = DataModelTestFlags (infile = 'SimplePoolFile.root',
23# Stream1 = 'SimplePoolFile2.root')
24#
25def DataModelTestFlags (infile = None, evtMax = 20, **kw):
26 flags = initConfigFlags()
27 flags.addFlag('rntuple', False)
28 flags.Exec.MaxEvents = evtMax
29 flags.Exec.OutputLevel = INFO
30 flags.Common.MsgSourceLength = 18
31
32 # Disable FPE auditing.
33 flags.Exec.FPE = -2
34
35 # Set input/output files.
36 if infile:
37 flags.Input.Files = [infile]
38 for stream, outfile in kw.items():
39 flags.addFlag (f'Output.{stream}FileName', outfile)
40
41 # Block input file peeking.
42 from Campaigns.Utils import Campaign
43 flags.Input.RunNumbers = [0]
44 flags.Input.TimeStamps = [0]
45 flags.Input.ProcessingTags = []
46 flags.Input.TypedCollections = []
47 flags.Input.isMC = True
48 flags.IOVDb.GlobalTag = ''
49 flags.Input.MCCampaign = Campaign.Unknown
50 #Run3 for now
51 from AthenaConfiguration.TestDefaults import defaultGeometryTags
52 flags.GeoModel.AtlasVersion = defaultGeometryTags.RUN3
53 flags.fillFromArgs()
54
55 if flags.rntuple:
56 flags.Output.DefaultContainerType = 'ROOTRNTUPLE'
57 def to_rntup (s):
58 return s.replace ('.root', '.rntup.root')
59 flags.Input.Files = [to_rntup(f) for f in flags.Input.Files]
60 for stream, outfile in kw.items():
61 setattr (flags.Output, stream+'FileName', to_rntup (outfile))
62
63 return flags
64
65
66#
67# Common configuration for tests.
68#
69def DataModelTestCfg (flags, testName,
70 loadReadDicts = False,
71 loadWriteDicts = False,
72 EventsPerLB = None,
73 TimeStampInterval = None,
74 readCatalog = None):
75 from AthenaConfiguration.MainServicesConfig import \
76 MainServicesCfg, MessageSvcCfg
77 cfg = MainServicesCfg (flags)
78 cfg.merge (MessageSvcCfg (flags))
79 cfg.getService("MessageSvc").debugLimit = 10000
80 cfg.addService (CompFactory.ClassIDSvc (OutputLevel = INFO))
81 cfg.addService (CompFactory.ChronoStatSvc (ChronoPrintOutTable = False,
82 PrintUserTime = False,
83 StatPrintOutTable = False))
84 cfg.addService (CompFactory.DataModelCompatSvc (), create = True)
85
86 if flags.Input.Files == ['_ATHENA_GENERIC_INPUTFILE_NAME_']:
87 # No input file --- configure like an event generator,
88 # and make an xAODEventInfo.
89 from McEventSelector.McEventSelectorConfig import McEventSelectorCfg
90 mckw = {}
91 if EventsPerLB is not None:
92 mckw['EventsPerLB'] = EventsPerLB
93 if TimeStampInterval is not None:
94 mckw['TimeStampInterval'] = TimeStampInterval
95 cfg.merge (McEventSelectorCfg (flags, **mckw))
96
97 from xAODEventInfoCnv.xAODEventInfoCnvConfig import EventInfoCnvAlgCfg
98 cfg.merge (EventInfoCnvAlgCfg (flags, disableBeamSpot = True))
99 elif not flags.Input.Files[0].endswith ('.bs'):
100 # Configure reading.
101 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
102 cfg.merge (PoolReadCfg (flags))
103
104 # Load dictionaries if requested.
105 if loadWriteDicts:
106 cfg.merge (LoadWriteDictsCfg (flags))
107 if loadReadDicts:
108 cfg.merge (LoadReadDictsCfg (flags))
109
110 # Prevent races when we run tests in parallel in the same directory.
111 if 'ROOTRNTUPLE' in flags.Output.DefaultContainerType:
112 testName = testName + '_rntup'
113 fileCatalog = testName + '_catalog.xml'
114 from AthenaPoolCnvSvc.PoolCommonConfig import PoolSvcCfg
115 kw = {'WriteCatalog' : 'file:' + fileCatalog}
116 if readCatalog:
117 kw['ReadCatalog'] = ['file:' + readCatalog]
118 cfg.merge (PoolSvcCfg (flags, **kw))
119 import os
120 try:
121 os.remove (fileCatalog)
122 except OSError:
123 pass
124
125
126 return cfg
127
128
129#
130# Configure an output stream.
131#
132def TestOutputCfg (flags, stream, itemList, typeNames = [], metaItemList = []):
133 from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
134 acc = ComponentAccumulator()
135 itemList = ['xAOD::EventInfo#EventInfo',
136 'xAOD::EventAuxInfo#EventInfoAux.'] + itemList
137 helperTools = []
138 metaItemList = ["IOVMetaDataContainer#*"]
139 if typeNames:
140 helperTools = [ CompFactory.xAODMaker.EventFormatStreamHelperTool(
141 f'{stream}_EventFormatStreamHelperTool',
142 Key = f'EventFormat{stream}',
143 TypeNames = typeNames,
144 DataHeaderKey = f'Stream{stream}') ]
145 metaItemList = [ f'xAOD::EventFormat#EventFormat{stream}' ] + metaItemList
146 acc.merge (OutputStreamCfg (flags, stream,
147 ItemList = itemList,
148 HelperTools = helperTools,
149 MetadataItemList = metaItemList))
150 if typeNames:
151 alg = acc.getEventAlgo (outputStreamName(stream))
152 alg.WritingTool.SubLevelBranchName = '<key>'
153 acc.getService ('AthenaPoolCnvSvc').PoolAttributes += ["DEFAULT_SPLITLEVEL='1'"]
154 return acc
155
156
157
158
159# Arrange to get dictionaries loaded for write tests.
160# Do this as an algorithm so we can defer it to initialize().
161# In some cases, loading DSOs during initial python processing
162# can cause component loading to fail.
163class LoadWriteDicts (Alg):
164 def __init__ (self, name = 'LoadWriteDicts', **kw):
165 super(LoadWriteDicts, self).__init__ (name=name, **kw)
166 def initialize (self):
167 import ROOT
168 ROOT.gROOT.SetBatch(True)
169 import cppyy
170 cppyy.load_library("libDataModelTestDataCommonDict")
171 cppyy.load_library("libDataModelTestDataWriteDict")
172 cppyy.load_library("libDataModelTestDataWriteCnvDict")
173 ROOT.DMTest.B
174 ROOT.DMTest.setConverterLibrary ('libDataModelTestDataWriteCnvPoolCnv.so')
175 ROOT.DMTest.setTrigConverterLibrary ('libDataModelTestDataWriteSerCnv.so')
176 return StatusCode.Success
177
178
180 acc = ComponentAccumulator()
181 acc.addEventAlgo (LoadWriteDicts())
182 return acc
183
184
185# Arrange to get dictionaries loaded for read tests.
186# Do this as an algorithm so we can defer it to initialize().
187# In some cases, loading DSOs during initial python processing
188# can cause component loading to fail.
189class LoadReadDicts (Alg):
190 def __init__ (self, name = 'LoadReadDicts', **kw):
191 super(LoadReadDicts, self).__init__ (name=name, **kw)
192 def initialize (self):
193 import ROOT
194 ROOT.gROOT.SetBatch(True)
195 import cppyy
196 cppyy.load_library("libDataModelTestDataCommonDict")
197 cppyy.load_library("libDataModelTestDataReadDict")
198 ROOT.DMTest.B
199 ROOT.gROOT.GetClass('DMTest::HAuxContainer_v1')
200 ROOT.gROOT.GetClass('DataVector<DMTest::H_v1>')
201 ROOT.gROOT.GetClass('DMTest::HView_v1')
202 ROOT.DMTest.setConverterLibrary ('libDataModelTestDataReadCnvPoolCnv.so')
203 ROOT.DMTest.setTrigConverterLibrary ('libDataModelTestDataReadSerCnv.so')
204 return StatusCode.Success
205
207 acc = ComponentAccumulator()
208 acc.addEventAlgo (LoadReadDicts())
209 return acc
210
211
212def rnt (flags):
213 is_rntuple = 'ROOTRNTUPLE' in flags.Output.DefaultContainerType
214 if is_rntuple:
215 return True, lambda k: ''
216 return False, lambda k:k
217
__init__(self, name='LoadReadDicts', **kw)
__init__(self, name='LoadWriteDicts', **kw)
TestOutputCfg(flags, stream, itemList, typeNames=[], metaItemList=[])
DataModelTestFlags(infile=None, evtMax=20, **kw)
DataModelTestCfg(flags, testName, loadReadDicts=False, loadWriteDicts=False, EventsPerLB=None, TimeStampInterval=None, readCatalog=None)
void initialize()