ATLAS Offline Software
Classes | Functions
DataModelTestConfig Namespace Reference

Classes

class  LoadReadDicts
 
class  LoadWriteDicts
 

Functions

def DataModelTestFlags (infile=None, evtMax=20, **kw)
 
def DataModelTestCfg (flags, testName, loadReadDicts=False, loadWriteDicts=False, EventsPerLB=None, TimeStampInterval=None, readCatalog=None)
 
def TestOutputCfg (flags, stream, itemList, typeNames=[], metaItemList=[])
 
def LoadWriteDictsCfg (flags)
 
def LoadReadDictsCfg (flags)
 
def rnt (flags)
 

Function Documentation

◆ DataModelTestCfg()

def DataModelTestConfig.DataModelTestCfg (   flags,
  testName,
  loadReadDicts = False,
  loadWriteDicts = False,
  EventsPerLB = None,
  TimeStampInterval = None,
  readCatalog = None 
)

Definition at line 65 of file DataModelTestConfig.py.

65 def DataModelTestCfg (flags, testName,
66  loadReadDicts = False,
67  loadWriteDicts = False,
68  EventsPerLB = None,
69  TimeStampInterval = None,
70  readCatalog = None):
71  from AthenaConfiguration.MainServicesConfig import \
72  MainServicesCfg, MessageSvcCfg
73  cfg = MainServicesCfg (flags)
74  cfg.merge (MessageSvcCfg (flags))
75  cfg.getService("MessageSvc").debugLimit = 10000
76  cfg.addService (CompFactory.ClassIDSvc (OutputLevel = INFO))
77  cfg.addService (CompFactory.ChronoStatSvc (ChronoPrintOutTable = False,
78  PrintUserTime = False,
79  StatPrintOutTable = False))
80 
81  if flags.Input.Files == ['_ATHENA_GENERIC_INPUTFILE_NAME_']:
82  # No input file --- configure like an event generator,
83  # and make an xAODEventInfo.
84  from McEventSelector.McEventSelectorConfig import McEventSelectorCfg
85  mckw = {}
86  if EventsPerLB is not None:
87  mckw['EventsPerLB'] = EventsPerLB
88  if TimeStampInterval is not None:
89  mckw['TimeStampInterval'] = TimeStampInterval
90  cfg.merge (McEventSelectorCfg (flags, **mckw))
91 
92  from xAODEventInfoCnv.xAODEventInfoCnvConfig import EventInfoCnvAlgCfg
93  cfg.merge (EventInfoCnvAlgCfg (flags, disableBeamSpot = True))
94  elif not flags.Input.Files[0].endswith ('.bs'):
95  # Configure reading.
96  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
97  cfg.merge (PoolReadCfg (flags))
98 
99  # Load dictionaries if requested.
100  if loadWriteDicts:
101  cfg.merge (LoadWriteDictsCfg (flags))
102  if loadReadDicts:
103  cfg.merge (LoadReadDictsCfg (flags))
104 
105  # Prevent races when we run tests in parallel in the same directory.
106  if flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE':
107  testName = testName + '_rntup'
108  fileCatalog = testName + '_catalog.xml'
109  from AthenaPoolCnvSvc.PoolCommonConfig import PoolSvcCfg
110  kw = {'WriteCatalog' : 'file:' + fileCatalog}
111  if readCatalog:
112  kw['ReadCatalog'] = ['file:' + readCatalog]
113  cfg.merge (PoolSvcCfg (flags, **kw))
114  import os
115  try:
116  os.remove (fileCatalog)
117  except OSError:
118  pass
119 
120 
121  return cfg
122 
123 
124 #
125 # Configure an output stream.
126 #

◆ DataModelTestFlags()

def DataModelTestConfig.DataModelTestFlags (   infile = None,
  evtMax = 20,
**  kw 
)

Definition at line 25 of file DataModelTestConfig.py.

25 def DataModelTestFlags (infile = None, evtMax = 20, **kw):
26  flags = initConfigFlags()
27  flags.Exec.MaxEvents = evtMax
28  flags.Exec.OutputLevel = INFO
29  flags.Common.MsgSourceLength = 18
30 
31  # Disable FPE auditing.
32  flags.Exec.FPE = -2
33 
34  # Set input/output files.
35  if infile:
36  flags.Input.Files = [infile]
37  for stream, outfile in kw.items():
38  flags.addFlag (f'Output.{stream}FileName', outfile)
39 
40  # Block input file peeking.
41  from Campaigns.Utils import Campaign
42  flags.Input.RunNumbers = [0]
43  flags.Input.TimeStamps = [0]
44  flags.Input.ProcessingTags = []
45  flags.Input.TypedCollections = []
46  flags.Input.isMC = True
47  flags.IOVDb.GlobalTag = ''
48  flags.Input.MCCampaign = Campaign.Unknown
49  flags.fillFromArgs()
50 
51  if flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE':
52  def to_rntup (s):
53  return s.replace ('.root', '.rntup.root')
54  flags.Input.Files = [to_rntup(f) for f in flags.Input.Files]
55  for k, v in flags.Output.asdict().items():
56  if k.endswith ('FileName'):
57  setattr (flags.Output, k, to_rntup (v))
58 
59  return flags
60 
61 
62 #
63 # Common configuration for tests.
64 #

◆ LoadReadDictsCfg()

def DataModelTestConfig.LoadReadDictsCfg (   flags)

Definition at line 201 of file DataModelTestConfig.py.

201 def LoadReadDictsCfg (flags):
202  acc = ComponentAccumulator()
203  acc.addEventAlgo (LoadReadDicts())
204  return acc
205 
206 

◆ LoadWriteDictsCfg()

def DataModelTestConfig.LoadWriteDictsCfg (   flags)

Definition at line 174 of file DataModelTestConfig.py.

174 def LoadWriteDictsCfg (flags):
175  acc = ComponentAccumulator()
176  acc.addEventAlgo (LoadWriteDicts())
177  return acc
178 
179 
180 # Arrange to get dictionaries loaded for read tests.
181 # Do this as an algorithm so we can defer it to initialize().
182 # In some cases, loading DSOs during initial python processing
183 # can cause component loading to fail.

◆ rnt()

def DataModelTestConfig.rnt (   flags)

Definition at line 207 of file DataModelTestConfig.py.

207 def rnt (flags):
208  is_rntuple = flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE'
209  if is_rntuple:
210  return True, lambda k: ''
211  return False, lambda k:k
212 

◆ TestOutputCfg()

def DataModelTestConfig.TestOutputCfg (   flags,
  stream,
  itemList,
  typeNames = [],
  metaItemList = [] 
)

Definition at line 127 of file DataModelTestConfig.py.

127 def TestOutputCfg (flags, stream, itemList, typeNames = [], metaItemList = []):
128  from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
129  acc = ComponentAccumulator()
130  itemList = ['xAOD::EventInfo#EventInfo',
131  'xAOD::EventAuxInfo#EventInfoAux.'] + itemList
132  helperTools = []
133  metaItemList = ["IOVMetaDataContainer#*"]
134  if typeNames:
135  helperTools = [ CompFactory.xAODMaker.EventFormatStreamHelperTool(
136  f'{stream}_EventFormatStreamHelperTool',
137  Key = f'EventFormat{stream}',
138  TypeNames = typeNames,
139  DataHeaderKey = f'Stream{stream}') ]
140  metaItemList = [ f'xAOD::EventFormat#EventFormat{stream}' ] + metaItemList
141  acc.merge (OutputStreamCfg (flags, stream,
142  ItemList = itemList,
143  HelperTools = helperTools,
144  MetadataItemList = metaItemList))
145  if typeNames:
146  alg = acc.getEventAlgo (outputStreamName(stream))
147  alg.WritingTool.SubLevelBranchName = '<key>'
148  acc.getService ('AthenaPoolCnvSvc').PoolAttributes += ["DEFAULT_SPLITLEVEL='1'"]
149  return acc
150 
151 
152 
153 
154 # Arrange to get dictionaries loaded for write tests.
155 # Do this as an algorithm so we can defer it to initialize().
156 # In some cases, loading DSOs during initial python processing
157 # can cause component loading to fail.
DataModelTestConfig.DataModelTestCfg
def DataModelTestCfg(flags, testName, loadReadDicts=False, loadWriteDicts=False, EventsPerLB=None, TimeStampInterval=None, readCatalog=None)
Definition: DataModelTestConfig.py:65
AthenaPoolExample_WriteCond.outputStreamName
string outputStreamName
Definition: AthenaPoolExample_WriteCond.py:21
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
DataModelTestConfig.LoadReadDictsCfg
def LoadReadDictsCfg(flags)
Definition: DataModelTestConfig.py:201
DataModelTestConfig.DataModelTestFlags
def DataModelTestFlags(infile=None, evtMax=20, **kw)
Definition: DataModelTestConfig.py:25
DataModelTestConfig.TestOutputCfg
def TestOutputCfg(flags, stream, itemList, typeNames=[], metaItemList=[])
Definition: DataModelTestConfig.py:127
DataModelTestConfig.LoadWriteDictsCfg
def LoadWriteDictsCfg(flags)
Definition: DataModelTestConfig.py:174
DataModelTestConfig.rnt
def rnt(flags)
Definition: DataModelTestConfig.py:207
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19