ATLAS Offline Software
MainServicesConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
3 from AthenaConfiguration.ComponentFactory import CompFactory
4 from AthenaCommon.Constants import INFO
5 
6 
7 def MainServicesMiniCfg(flags, loopMgr='AthenaEventLoopMgr', masterSequence='AthAlgSeq'):
8  """Mininmal basic config, just good enough for HelloWorld and alike"""
9  cfg = ComponentAccumulator(CompFactory.AthSequencer(masterSequence,
10  Sequential=True,
11  TimeOut=flags.Exec.EventTimeOut))
12  cfg.setAsTopLevel()
13  cfg.setAppProperty('TopAlg',['AthSequencer/'+masterSequence])
14  cfg.setAppProperty('MessageSvcType', 'MessageSvc')
15  cfg.setAppProperty('EventLoop', loopMgr)
16  cfg.setAppProperty('ExtSvcCreates', 'False')
17  cfg.setAppProperty('JobOptionsSvcType', 'JobOptionsSvc')
18  cfg.setAppProperty('JobOptionsType', 'NONE')
19  cfg.setAppProperty('JobOptionsPostAction', '')
20  cfg.setAppProperty('JobOptionsPreAction', '')
21  cfg.setAppProperty('PrintAlgsSequence', flags.Exec.PrintAlgsSequence)
22  return cfg
23 
24 
25 def AvalancheSchedulerSvcCfg(flags, **kwargs):
26  kwargs.setdefault("CheckDependencies", flags.Scheduler.CheckDependencies)
27  kwargs.setdefault("CheckOutputUsage", flags.Scheduler.CheckOutputUsage)
28  kwargs.setdefault("ShowDataDependencies", flags.Scheduler.ShowDataDeps)
29  kwargs.setdefault("ShowDataFlow", flags.Scheduler.ShowDataFlow)
30  kwargs.setdefault("ShowControlFlow", flags.Scheduler.ShowControlFlow)
31  kwargs.setdefault("VerboseSubSlots", flags.Scheduler.EnableVerboseViews)
32  kwargs.setdefault("ThreadPoolSize", flags.Concurrency.NumThreads)
33  kwargs.setdefault("DataDepsGraphFile", flags.Scheduler.DataDepsGraphFile)
34  kwargs.setdefault("DataDepsGraphAlgPattern", flags.Scheduler.DataDepsGraphAlgPattern)
35  kwargs.setdefault("DataDepsGraphObjectPattern", flags.Scheduler.DataDepsGraphObjectPattern)
36  kwargs.setdefault("NumOffloadThreads", flags.Concurrency.NumOffloadThreads)
37 
38  cfg = ComponentAccumulator()
39  scheduler = CompFactory.AvalancheSchedulerSvc(**kwargs)
40  cfg.addService(scheduler, primary=True)
41 
42  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
43  # FailIfNoProxy=False makes it a warning, not an error, if unmet data
44  # dependencies are not found in the store. It should probably be changed
45  # to True eventually.
46  inputloader_ca = SGInputLoaderCfg(flags, FailIfNoProxy=flags.Input.FailOnUnknownCollections)
47  cfg.merge(inputloader_ca, sequenceName="AthAlgSeq")
48 
49  # Specifying DataLoaderAlg makes the Scheduler automatically assign
50  # all unmet data dependencies to that algorithm.
51  if flags.Scheduler.AutoLoadUnmetDependencies:
52  scheduler.DataLoaderAlg = inputloader_ca.getPrimary().getName()
53 
54  return cfg
55 
56 
57 def OutputUsageIgnoreCfg(flags, algorithm):
58  cfg = ComponentAccumulator()
59  if flags.Concurrency.NumThreads > 0 and flags.Scheduler.CheckOutputUsage:
60  cfg.merge(AvalancheSchedulerSvcCfg(flags, CheckOutputUsageIgnoreList=[algorithm]))
61  return cfg
62 
63 
65  cfg = ComponentAccumulator()
66  elmgr = CompFactory.AthenaEventLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
67  if flags.Input.OverrideRunNumber:
68  from AthenaKernel.EventIdOverrideConfig import EvtIdModifierSvcCfg
69  elmgr.EvtIdModifierSvc = cfg.getPrimaryAndMerge( EvtIdModifierSvcCfg(flags) ).name
70 
71  if flags.Common.isOverlay:
72  if not flags.Overlay.DataOverlay:
73  elmgr.RequireInputAttributeList = True
74  elmgr.UseSecondaryEventNumber = True
75 
76  cfg.addService( elmgr )
77 
78  return cfg
79 
80 
82  cfg = ComponentAccumulator()
83  hivesvc = CompFactory.SG.HiveMgrSvc("EventDataSvc",
84  NSlots = flags.Concurrency.NumConcurrentEvents)
85  cfg.addService( hivesvc )
86 
87  arp = CompFactory.AlgResourcePool(TopAlg = ["AthMasterSeq"]) #this should enable control flow
88  cfg.addService( arp )
89 
90  scheduler = cfg.getPrimaryAndMerge(AvalancheSchedulerSvcCfg(flags))
91 
92  elmgr = CompFactory.AthenaHiveEventLoopMgr(
93  WhiteboardSvc = "EventDataSvc",
94  SchedulerSvc = scheduler.getName(),
95  EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
96 
97  if flags.Input.OverrideRunNumber:
98  from AthenaKernel.EventIdOverrideConfig import EvtIdModifierSvcCfg
99  elmgr.EvtIdModifierSvc = cfg.getPrimaryAndMerge(EvtIdModifierSvcCfg(flags)).name
100 
101  if flags.Common.isOverlay and not flags.Overlay.DataOverlay:
102  elmgr.RequireInputAttributeList = True
103  elmgr.UseSecondaryEventNumber = True
104 
105  cfg.addService( elmgr )
106 
107  return cfg
108 
110  cfg = ComponentAccumulator()
111  if flags.Common.isOverlay and not flags.Overlay.DataOverlay:
112  elmgr = CompFactory.AthenaEventLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
113  elmgr.RequireInputAttributeList = True
114  elmgr.UseSecondaryEventNumber = True
115  cfg.addService( elmgr )
116 
117  from AthenaMP.AthenaMPConfig import AthenaMPCfg
118  mploop = AthenaMPCfg(flags)
119  cfg.merge( mploop )
120 
121  return cfg
122 
123 
124 def AthenaMtesEventLoopMgrCfg(flags, mtEs=False, channel=''):
125  cfg = ComponentAccumulator()
126 
127  hivesvc = CompFactory.SG.HiveMgrSvc("EventDataSvc",
128  NSlots = flags.Concurrency.NumConcurrentEvents)
129  cfg.addService( hivesvc )
130 
131  arp = CompFactory.AlgResourcePool(TopAlg = ["AthMasterSeq"]) #this should enable control flow
132  cfg.addService( arp )
133 
134  scheduler = cfg.getPrimaryAndMerge(AvalancheSchedulerSvcCfg(flags))
135 
136  elmgr = CompFactory.AthenaMtesEventLoopMgr(
137  WhiteboardSvc = "EventDataSvc",
138  SchedulerSvc = scheduler.getName(),
139  EventRangeChannel = channel,
140  EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
141 
142  if flags.Input.OverrideRunNumber:
143  from AthenaKernel.EventIdOverrideConfig import EvtIdModifierSvcCfg
144  elmgr.EvtIdModifierSvc = cfg.getPrimaryAndMerge(EvtIdModifierSvcCfg(flags)).name
145 
146  if flags.Common.isOverlay and not flags.Overlay.DataOverlay:
147  elmgr.RequireInputAttributeList = True
148  elmgr.UseSecondaryEventNumber = True
149 
150  if mtEs:
151  from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
152  cfg.merge(OutputStreamSequencerSvcCfg(flags,
153  incidentName="NextEventRange",
154  reportingOn = True))
155 
156  cfg.addService( elmgr )
157 
158  return cfg
159 
160 
161 def MessageSvcCfg(flags):
162  cfg = ComponentAccumulator()
163  msgsvc = CompFactory.MessageSvc()
164  msgsvc.OutputLevel = flags.Exec.OutputLevel
165  msgsvc.Format = "% F%{:d}W%C%7W%R%T %0W%M".format(flags.Common.MsgSourceLength)
166  msgsvc.enableSuppression = flags.Common.MsgSuppression
167  if flags.Common.ShowMsgStats:
168  msgsvc.showStats = True
169  from AthenaCommon.Constants import WARNING
170  msgsvc.statLevel = WARNING
171 
172  from AthenaConfiguration.Enums import ProductionStep
173  if flags.Common.ProductionStep not in [ProductionStep.Default, ProductionStep.Reconstruction, ProductionStep.Derivation]:
174  msgsvc.Format = "% F%18W%S%7W%R%T %0W%M" # Temporary to match legacy configuration for serial simulation/digitization/overlay jobs
175  if flags.Concurrency.NumThreads>0:
176  msgsvc.Format = "% F%{:d}W%C%6W%R%e%s%8W%R%T %0W%M".format(flags.Common.MsgSourceLength)
177  if flags.Exec.VerboseMessageComponents:
178  msgsvc.verboseLimit=0
179  if flags.Exec.DebugMessageComponents:
180  msgsvc.debugLimit=0
181  if flags.Exec.InfoMessageComponents:
182  msgsvc.infoLimit=0
183  if flags.Exec.WarningMessageComponents:
184  msgsvc.warningLimit=0
185  if flags.Exec.ErrorMessageComponents:
186  msgsvc.errorLimit=0
187 
188  cfg.addService(msgsvc)
189  return cfg
190 
191 
192 def addMainSequences(flags, cfg):
193  """Add the standard sequences to cfg"""
194 
195  # Build standard sequences:
196  AthSequencer = CompFactory.AthSequencer
197  cfg.addSequence(AthSequencer('AthAlgEvtSeq', Sequential=True, StopOverride=True), parentName='AthMasterSeq')
198  cfg.addSequence(AthSequencer('AthOutSeq', StopOverride=True), parentName='AthMasterSeq')
199 
200  cfg.addSequence(AthSequencer('AthBeginSeq', Sequential=True), parentName='AthAlgEvtSeq')
201  cfg.addSequence(AthSequencer('AthAllAlgSeq', StopOverride=True), parentName='AthAlgEvtSeq')
202 
203  athAlgSeq = AthSequencer('AthAlgSeq', IgnoreFilterPassed=True, StopOverride=True, ProcessDynamicDataDependencies=True, ExtraDataForDynamicConsumers=[])
204  athCondSeq = AthSequencer('AthCondSeq',StopOverride=True)
205 
206  if flags.Concurrency.NumThreads==0:
207  # For serial execution, we need the CondAlgs to execute first.
208  cfg.addSequence(athCondSeq, parentName='AthAllAlgSeq')
209  cfg.addSequence(athAlgSeq, parentName='AthAllAlgSeq')
210  else:
211  # In MT, the order of execution is irrelevant (determined by data deps).
212  # We add the conditions sequence later such that the CondInputLoader gets
213  # initialized after all other user Algorithms for MT, so the base classes
214  # of data deps can be correctly determined.
215  cfg.addSequence(athAlgSeq, parentName='AthAllAlgSeq')
216  cfg.addSequence(athCondSeq, parentName='AthAllAlgSeq')
217 
218  cfg.addSequence(AthSequencer('AthEndSeq', Sequential=True), parentName='AthAlgEvtSeq')
219 
220  # Set up incident firing:
221  AthIncFirerAlg = CompFactory.AthIncFirerAlg
222  IncidentProcAlg = CompFactory.IncidentProcAlg
223 
224  previousPerfmonDomain = cfg.getCurrentPerfmonDomain()
225  cfg.flagPerfmonDomain('Incidents')
226 
227  cfg.addEventAlgo(AthIncFirerAlg("BeginIncFiringAlg", FireSerial=False, Incidents=['BeginEvent']),
228  sequenceName='AthBeginSeq')
229 
230  cfg.addEventAlgo(IncidentProcAlg('IncidentProcAlg1'),
231  sequenceName='AthBeginSeq')
232 
233  cfg.addEventAlgo(AthIncFirerAlg('EndIncFiringAlg', FireSerial=False, Incidents=['EndEvent']),
234  sequenceName="AthEndSeq")
235 
236  cfg.addEventAlgo(IncidentProcAlg('IncidentProcAlg2'),
237  sequenceName="AthEndSeq")
238 
239  # Should be after all other algorithms:
240  cfg.addEventAlgo(AthIncFirerAlg('EndAlgorithmsFiringAlg', FireSerial=False, Incidents=['EndAlgorithms']),
241  sequenceName="AthMasterSeq")
242 
243  cfg.addEventAlgo(IncidentProcAlg('IncidentProcAlg3'),
244  sequenceName="AthMasterSeq")
245 
246  cfg.flagPerfmonDomain(previousPerfmonDomain)
247 
248 
249 def addEvgenSequences(flags, cfg):
250  from GeneratorConfig.Sequences import EvgenSequence, EvgenSequenceFactory
251  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.Generator), parentName="AthAlgSeq")
252  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.Fix), parentName="AthAlgSeq")
253  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.PreFilter), parentName="AthAlgSeq")
254  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.Test), parentName="AthAlgSeq")
255  # TODO: needs to setup proper filtering sequence
256  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.Filter), parentName="AthAlgSeq")
257  cfg.addSequence(EvgenSequenceFactory(EvgenSequence.Post), parentName="AthAlgSeq")
258 
259 
260 def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr'):
261  # Set the Python OutputLevel on the root logger
262  from AthenaCommon.Logging import log
263  log.setLevel(flags.Exec.OutputLevel)
264 
265  if flags.Exec.Interactive == "run":
266  LoopMgr="PyAthenaEventLoopMgr"
267  log.info("Interactive mode, switching to %s", LoopMgr)
268  else:
269  # Run a serial job for threads=0
270  if flags.Concurrency.NumThreads > 0:
271  if flags.Concurrency.NumConcurrentEvents==0:
272  raise Exception("Requested Concurrency.NumThreads>0 and Concurrency.NumConcurrentEvents==0, "
273  "which will not process events!")
274  if flags.Exec.MTEventService:
275  LoopMgr = "AthenaMtesEventLoopMgr"
276  else:
277  LoopMgr = "AthenaHiveEventLoopMgr"
278 
279  if flags.Concurrency.NumProcs > 0:
280  LoopMgr = "AthMpEvtLoopMgr"
281 
282  # Core components needed for serial and threaded jobs:
283  cfg = MainServicesMiniCfg(flags, loopMgr=LoopMgr, masterSequence='AthMasterSeq')
284 
285  # Main sequences and incident handling:
286  addMainSequences(flags, cfg)
287 
288  # Basic services:
289  cfg.addService(CompFactory.ClassIDSvc(CLIDDBFiles = ['clid.db','Gaudi_clid.db']))
290 
291  cfg.addService(CompFactory.AlgContextSvc(BypassIncidents=True))
292  cfg.addAuditor(CompFactory.AlgContextAuditor())
293 
294  cfg.addService(CompFactory.StoreGateSvc(Dump=flags.Debug.DumpEvtStore))
295  cfg.addService(CompFactory.StoreGateSvc("DetectorStore",Dump=flags.Debug.DumpDetStore))
296  cfg.addService(CompFactory.StoreGateSvc("HistoryStore"))
297  cfg.addService(CompFactory.StoreGateSvc("ConditionStore",Dump=flags.Debug.DumpCondStore))
298 
299  cfg.merge(MessageSvcCfg(flags))
300 
301  from AthenaConfiguration.FPEAndCoreDumpConfig import FPEAndCoreDumpCfg
302  cfg.merge(FPEAndCoreDumpCfg(flags))
303 
304  # Avoid stack traces to the exception handler. These traces
305  # aren't very useful since they just point to the handler, not
306  # the original bug.
307  cfg.addService(CompFactory.ExceptionSvc(Catch="NONE"))
308 
309  # ApplicationMgr properties:
310  cfg.setAppProperty('AuditAlgorithms', True)
311  cfg.setAppProperty('InitializationLoopCheck', False)
312  cfg.setAppProperty('EvtMax', flags.Exec.MaxEvents)
313  if flags.Exec.OutputLevel > INFO:
314  # this turns off the appMgr spalsh
315  cfg.setAppProperty('AppName', '')
316  cfg.setAppProperty('OutputLevel', flags.Exec.OutputLevel)
317 
318  if flags.Exec.DebugStage != "":
319  cfg.setDebugStage(flags.Exec.DebugStage)
320 
321  cfg.interactive=flags.Exec.Interactive
322 
323  if flags.Concurrency.NumProcs > 0:
324  cfg.merge(AthenaMpEventLoopMgrCfg(flags))
325 
326  # Additional components needed for threaded jobs only:
327  if flags.Concurrency.NumThreads > 0:
328  if flags.Exec.MTEventService:
329  cfg.merge(AthenaMtesEventLoopMgrCfg(flags,True,flags.Exec.MTEventServiceChannel))
330  else:
331  cfg.merge(AthenaHiveEventLoopMgrCfg(flags))
332  # Setup SGCommitAuditor to sweep new DataObjects at end of Alg execute
333  cfg.addAuditor( CompFactory.SGCommitAuditor() )
334  elif LoopMgr == 'AthenaEventLoopMgr':
335  cfg.merge(AthenaEventLoopMgrCfg(flags))
336 
337  # Performance monitoring and profiling:
338  if flags.PerfMon.doFastMonMT or flags.PerfMon.doFullMonMT:
339  from PerfMonComps.PerfMonCompsConfig import PerfMonMTSvcCfg
340  cfg.merge(PerfMonMTSvcCfg(flags))
341 
342  if flags.PerfMon.doGPerfProf:
343  from PerfMonGPerfTools.GPT_ProfilerServiceConfig import GPT_ProfilerServiceCfg
344  cfg.merge(GPT_ProfilerServiceCfg(flags))
345 
346  if len(flags.PerfMon.Valgrind.ProfiledAlgs)>0:
347  from Valkyrie.ValkyrieConfig import ValgrindServiceCfg
348  cfg.merge(ValgrindServiceCfg(flags))
349 
350  return cfg
351 
352 
353 def MainEvgenServicesCfg(flags, LoopMgr="AthenaEventLoopMgr", withSequences=True):
354  """ComponentAccumulator-based equivalent of:
355  import AthenaCommon.AtlasUnixGeneratorJob
356 
357  NB Must have set flags.Input.RunNumbers and
358  flags.Input.TimeStamps before calling to avoid
359  attempted auto-configuration from an input file.
360  """
361  cfg = MainServicesCfg(flags, LoopMgr)
362  from McEventSelector.McEventSelectorConfig import McEventSelectorCfg
363  cfg.merge(McEventSelectorCfg(flags))
364 
365  if withSequences:
366  addEvgenSequences(flags, cfg)
367 
368  return cfg
369 
370 
371 if __name__=="__main__":
372  from AthenaConfiguration.AllConfigFlags import initConfigFlags
373  flags = initConfigFlags()
374  try:
375  flags.Input.RunNumbers = [284500] # Set to either MC DSID or MC Run Number
376  flags.Input.TimeStamps = [1] # dummy value
377  cfg = MainEvgenServicesCfg(flags, withSequences=True)
378  except ModuleNotFoundError:
379  # The McEventSelector package required by MainEvgenServicesCfg is not part of the AthAnalysis project
380  cfg = MainServicesCfg(flags)
381  cfg.wasMerged() # to avoid errror that CA was not merged
python.OutputStreamSequencerSvcConfig.OutputStreamSequencerSvcCfg
def OutputStreamSequencerSvcCfg(flags, incidentName='', reportingOn=False)
Definition: OutputStreamSequencerSvcConfig.py:6
python.AthenaMPConfig.AthenaMPCfg
def AthenaMPCfg(flags)
Definition: AthenaMPConfig.py:49
python.Sequences.EvgenSequenceFactory
def EvgenSequenceFactory(sequence)
Definition: Sequences.py:17
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
AthSequencer
ClassName: AthSequencer.
Definition: AthSequencer.h:40
vtune_athena.format
format
Definition: vtune_athena.py:14
python.MainServicesConfig.OutputUsageIgnoreCfg
def OutputUsageIgnoreCfg(flags, algorithm)
Definition: MainServicesConfig.py:57
python.FPEAndCoreDumpConfig.FPEAndCoreDumpCfg
def FPEAndCoreDumpCfg(flags)
Definition: FPEAndCoreDumpConfig.py:11
python.PerfMonCompsConfig.PerfMonMTSvcCfg
def PerfMonMTSvcCfg(flags, **kwargs)
A minimal new-style configuration for PerfMonMTSvc.
Definition: PerfMonCompsConfig.py:10
dumpTruth.getName
getName
Definition: dumpTruth.py:34
python.MainServicesConfig.MessageSvcCfg
def MessageSvcCfg(flags)
Definition: MainServicesConfig.py:161
python.MainServicesConfig.MainEvgenServicesCfg
def MainEvgenServicesCfg(flags, LoopMgr="AthenaEventLoopMgr", withSequences=True)
Definition: MainServicesConfig.py:353
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.MainServicesConfig.addMainSequences
def addMainSequences(flags, cfg)
Definition: MainServicesConfig.py:192
python.MainServicesConfig.AthenaEventLoopMgrCfg
def AthenaEventLoopMgrCfg(flags)
Definition: MainServicesConfig.py:64
python.MainServicesConfig.addEvgenSequences
def addEvgenSequences(flags, cfg)
Definition: MainServicesConfig.py:249
GPT_ProfilerServiceConfig.GPT_ProfilerServiceCfg
def GPT_ProfilerServiceCfg(flags, **kwargs)
Definition: GPT_ProfilerServiceConfig.py:10
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
Constants
some useful constants -------------------------------------------------—
python.McEventSelectorConfig.McEventSelectorCfg
def McEventSelectorCfg(flags, **kwargs)
Definition: McEventSelectorConfig.py:5
python.MainServicesConfig.AvalancheSchedulerSvcCfg
def AvalancheSchedulerSvcCfg(flags, **kwargs)
Definition: MainServicesConfig.py:25
AthIncFirerAlg
Definition: AthIncFirerAlg.h:16
python.ValkyrieConfig.ValgrindServiceCfg
def ValgrindServiceCfg(flags, **kwargs)
Definition: ValkyrieConfig.py:7
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
python.MainServicesConfig.AthenaMpEventLoopMgrCfg
def AthenaMpEventLoopMgrCfg(flags)
Definition: MainServicesConfig.py:109
python.EventIdOverrideConfig.EvtIdModifierSvcCfg
def EvtIdModifierSvcCfg(flags, name="EvtIdModifierSvc", **kwargs)
Definition: EventIdOverrideConfig.py:140
python.MainServicesConfig.AthenaHiveEventLoopMgrCfg
def AthenaHiveEventLoopMgrCfg(flags)
Definition: MainServicesConfig.py:81
python.MainServicesConfig.AthenaMtesEventLoopMgrCfg
def AthenaMtesEventLoopMgrCfg(flags, mtEs=False, channel='')
Definition: MainServicesConfig.py:124
python.MainServicesConfig.MainServicesMiniCfg
def MainServicesMiniCfg(flags, loopMgr='AthenaEventLoopMgr', masterSequence='AthAlgSeq')
Definition: MainServicesConfig.py:7