Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Classes | Functions
DataModelTestConfig Namespace Reference

Classes

class  LoadReadDicts
 
class  LoadWriteDicts
 

Functions

def DataModelTestFlags (infile=None, evtMax=20, **kw)
 
def DataModelTestCfg (flags, testName, loadReadDicts=False, loadWriteDicts=False, EventsPerLB=None, TimeStampInterval=None, readCatalog=None)
 
def TestOutputCfg (flags, stream, itemList, typeNames=[], metaItemList=[])
 
def LoadWriteDictsCfg (flags)
 
def LoadReadDictsCfg (flags)
 
def rnt (flags)
 

Function Documentation

◆ DataModelTestCfg()

def DataModelTestConfig.DataModelTestCfg (   flags,
  testName,
  loadReadDicts = False,
  loadWriteDicts = False,
  EventsPerLB = None,
  TimeStampInterval = None,
  readCatalog = None 
)

Definition at line 64 of file DataModelTestConfig.py.

64 def DataModelTestCfg (flags, testName,
65  loadReadDicts = False,
66  loadWriteDicts = False,
67  EventsPerLB = None,
68  TimeStampInterval = None,
69  readCatalog = None):
70  from AthenaConfiguration.MainServicesConfig import \
71  MainServicesCfg, MessageSvcCfg
72  cfg = MainServicesCfg (flags)
73  cfg.merge (MessageSvcCfg (flags))
74  cfg.getService("MessageSvc").debugLimit = 10000
75  cfg.addService (CompFactory.ClassIDSvc (OutputLevel = INFO))
76  cfg.addService (CompFactory.ChronoStatSvc (ChronoPrintOutTable = False,
77  PrintUserTime = False,
78  StatPrintOutTable = False))
79 
80  if flags.Input.Files == ['_ATHENA_GENERIC_INPUTFILE_NAME_']:
81  # No input file --- configure like an event generator,
82  # and make an xAODEventInfo.
83  from McEventSelector.McEventSelectorConfig import McEventSelectorCfg
84  mckw = {}
85  if EventsPerLB is not None:
86  mckw['EventsPerLB'] = EventsPerLB
87  if TimeStampInterval is not None:
88  mckw['TimeStampInterval'] = TimeStampInterval
89  cfg.merge (McEventSelectorCfg (flags, **mckw))
90 
91  from xAODEventInfoCnv.xAODEventInfoCnvConfig import EventInfoCnvAlgCfg
92  cfg.merge (EventInfoCnvAlgCfg (flags, disableBeamSpot = True))
93  elif not flags.Input.Files[0].endswith ('.bs'):
94  # Configure reading.
95  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
96  cfg.merge (PoolReadCfg (flags))
97 
98  # Load dictionaries if requested.
99  if loadWriteDicts:
100  cfg.merge (LoadWriteDictsCfg (flags))
101  if loadReadDicts:
102  cfg.merge (LoadReadDictsCfg (flags))
103 
104  # Prevent races when we run tests in parallel in the same directory.
105  if flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE':
106  testName = testName + '_rntup'
107  fileCatalog = testName + '_catalog.xml'
108  from AthenaPoolCnvSvc.PoolCommonConfig import PoolSvcCfg
109  kw = {'WriteCatalog' : 'file:' + fileCatalog}
110  if readCatalog:
111  kw['ReadCatalog'] = ['file:' + readCatalog]
112  cfg.merge (PoolSvcCfg (flags, **kw))
113  import os
114  try:
115  os.remove (fileCatalog)
116  except OSError:
117  pass
118 
119 
120  return cfg
121 
122 
123 #
124 # Configure an output stream.
125 #

◆ DataModelTestFlags()

def DataModelTestConfig.DataModelTestFlags (   infile = None,
  evtMax = 20,
**  kw 
)

Definition at line 25 of file DataModelTestConfig.py.

25 def DataModelTestFlags (infile = None, evtMax = 20, **kw):
26  flags = initConfigFlags()
27  flags.Exec.MaxEvents = evtMax
28  flags.Exec.OutputLevel = INFO
29  flags.Common.MsgSourceLength = 18
30 
31  # Disable FPE auditing.
32  flags.Exec.FPE = -2
33 
34  # Set input/output files.
35  if infile:
36  flags.Input.Files = [infile]
37  for stream, outfile in kw.items():
38  flags.addFlag (f'Output.{stream}FileName', outfile)
39 
40  # Block input file peeking.
41  from Campaigns.Utils import Campaign
42  flags.Input.RunNumbers = [0]
43  flags.Input.TimeStamps = [0]
44  flags.Input.ProcessingTags = []
45  flags.Input.TypedCollections = []
46  flags.Input.isMC = True
47  flags.IOVDb.GlobalTag = ''
48  flags.Input.MCCampaign = Campaign.Unknown
49  flags.fillFromArgs()
50 
51  if flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE':
52  def to_rntup (s):
53  return s.replace ('.root', '.rntup.root')
54  flags.Input.Files = [to_rntup(f) for f in flags.Input.Files]
55  for stream, outfile in kw.items():
56  setattr (flags.Output, stream+'FileName', to_rntup (outfile))
57 
58  return flags
59 
60 
61 #
62 # Common configuration for tests.
63 #

◆ LoadReadDictsCfg()

def DataModelTestConfig.LoadReadDictsCfg (   flags)

Definition at line 200 of file DataModelTestConfig.py.

200 def LoadReadDictsCfg (flags):
201  acc = ComponentAccumulator()
202  acc.addEventAlgo (LoadReadDicts())
203  return acc
204 
205 

◆ LoadWriteDictsCfg()

def DataModelTestConfig.LoadWriteDictsCfg (   flags)

Definition at line 173 of file DataModelTestConfig.py.

173 def LoadWriteDictsCfg (flags):
174  acc = ComponentAccumulator()
175  acc.addEventAlgo (LoadWriteDicts())
176  return acc
177 
178 
179 # Arrange to get dictionaries loaded for read tests.
180 # Do this as an algorithm so we can defer it to initialize().
181 # In some cases, loading DSOs during initial python processing
182 # can cause component loading to fail.

◆ rnt()

def DataModelTestConfig.rnt (   flags)

Definition at line 206 of file DataModelTestConfig.py.

206 def rnt (flags):
207  is_rntuple = flags.Output.StorageTechnology.EventData == 'ROOTRNTUPLE'
208  if is_rntuple:
209  return True, lambda k: ''
210  return False, lambda k:k
211 

◆ TestOutputCfg()

def DataModelTestConfig.TestOutputCfg (   flags,
  stream,
  itemList,
  typeNames = [],
  metaItemList = [] 
)

Definition at line 126 of file DataModelTestConfig.py.

126 def TestOutputCfg (flags, stream, itemList, typeNames = [], metaItemList = []):
127  from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
128  acc = ComponentAccumulator()
129  itemList = ['xAOD::EventInfo#EventInfo',
130  'xAOD::EventAuxInfo#EventInfoAux.'] + itemList
131  helperTools = []
132  metaItemList = ["IOVMetaDataContainer#*"]
133  if typeNames:
134  helperTools = [ CompFactory.xAODMaker.EventFormatStreamHelperTool(
135  f'{stream}_EventFormatStreamHelperTool',
136  Key = f'EventFormat{stream}',
137  TypeNames = typeNames,
138  DataHeaderKey = f'Stream{stream}') ]
139  metaItemList = [ f'xAOD::EventFormat#EventFormat{stream}' ] + metaItemList
140  acc.merge (OutputStreamCfg (flags, stream,
141  ItemList = itemList,
142  HelperTools = helperTools,
143  MetadataItemList = metaItemList))
144  if typeNames:
145  alg = acc.getEventAlgo (outputStreamName(stream))
146  alg.WritingTool.SubLevelBranchName = '<key>'
147  acc.getService ('AthenaPoolCnvSvc').PoolAttributes += ["DEFAULT_SPLITLEVEL='1'"]
148  return acc
149 
150 
151 
152 
153 # Arrange to get dictionaries loaded for write tests.
154 # Do this as an algorithm so we can defer it to initialize().
155 # In some cases, loading DSOs during initial python processing
156 # can cause component loading to fail.
DataModelTestConfig.DataModelTestCfg
def DataModelTestCfg(flags, testName, loadReadDicts=False, loadWriteDicts=False, EventsPerLB=None, TimeStampInterval=None, readCatalog=None)
Definition: DataModelTestConfig.py:64
AthenaPoolExample_WriteCond.outputStreamName
string outputStreamName
Definition: AthenaPoolExample_WriteCond.py:21
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
DataModelTestConfig.LoadReadDictsCfg
def LoadReadDictsCfg(flags)
Definition: DataModelTestConfig.py:200
DataModelTestConfig.DataModelTestFlags
def DataModelTestFlags(infile=None, evtMax=20, **kw)
Definition: DataModelTestConfig.py:25
DataModelTestConfig.TestOutputCfg
def TestOutputCfg(flags, stream, itemList, typeNames=[], metaItemList=[])
Definition: DataModelTestConfig.py:126
DataModelTestConfig.LoadWriteDictsCfg
def LoadWriteDictsCfg(flags)
Definition: DataModelTestConfig.py:173
DataModelTestConfig.rnt
def rnt(flags)
Definition: DataModelTestConfig.py:206
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19