ATLAS Offline Software
Loading...
Searching...
No Matches
AlgScheduler.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3# Configuration for the Hive Algorithm Scheduler.
4#
5# Allows easy low level replacement of the specific Scheduler, without
6# requiring clients to know which one is in use.
7#
8# the AlgScheduler will be setup with the same number of threads
9# as are specified on the command line with the "--threads=N" parameter
10#
11# usage:
12# from AthenaCommon.AlgScheduler import AlgScheduler
13# clients can then configure runtime printouts, threadpool, etc:
14# AlgScheduler.ShowDataDependencies( True )
15# AlgScheduler.ShowControlFlow( True )
16# AlgScheduler.ShowDataFlow( True )
17# AlgScheduler.setThreadPoolSize( 7 )
18#
19# if a specific scheduler lacks that option, a warning message is printed
20# clients can also replace the default scheduler with another one
21# from GaudiHive.GaudiHiveConf import ForwardSchedulerSvc
22# myScheduler = ForwardSchedulerSvc()
23# AlgScheduler.SetScheduler( myScheduler )
24# AlgScheduler.setThreadPoolSize( 7 )
25#
26# if this is done, the HiveEventLoopMgr also needs to know about it
27# from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr
28# svcMgr.AthenaHiveEventLoopMgr.SchedulerSvc = AlgScheduler.getScheduler().getName()
29#
30
31
33 def __init__(self,theSched=None,thePrec=None):
34 """Setup Algorithm Scheduler"""
35
36 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
37 from AthenaCommon.Constants import INFO
38
39 from AthenaCommon.ConcurrencyFlags import jobproperties as jps
40 from AthenaCommon.Logging import logging
41
42 self.log = logging.getLogger( 'AlgScheduler' )
43
44 if (theSched is None) :
45 from GaudiHive.GaudiHiveConf import AvalancheSchedulerSvc
46 svcMgr += AvalancheSchedulerSvc()
47 self.SchedulerSvc = svcMgr.AvalancheSchedulerSvc
48 else :
49 svcMgr += theSched
50 self.SchedulerSvc = theSched
51
52 if (thePrec is None) :
53 from GaudiHive.GaudiHiveConf import PrecedenceSvc
54 svcMgr += PrecedenceSvc()
55 self.PrecedenceSvc = svcMgr.PrecedenceSvc
56 else :
57 svcMgr += thePrec
58 self.PrecedenceSvc = thePrec
59
60 self.SchedulerSvc.OutputLevel = INFO
61 self.PrecedenceSvc.OutputLevel = INFO
62 self.SchedulerSvc.CheckDependencies = True
63 self.SchedulerSvc.ThreadPoolSize = jps.ConcurrencyFlags.NumThreads()
64 self.SchedulerSvc.NumOffloadThreads = 0 # we don't support configuring this in legacy job options
65
66 self.log.info("setting up " + self.SchedulerSvc.getFullName() + " with " + str(jps.ConcurrencyFlags.NumThreads()) + " threads")
67
68
69#
70## exchange the current scheduler for another one
71 def SetScheduler(self,theSched):
72 """setup a different Scheduler"""
73
74 if (self.SchedulerSvc.getFullName() != theSched.getFullName()) :
75 self.log.info("replacing " + self.SchedulerSvc.getFullName()
76 + " with " + theSched.getFullName())
77 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
78 svcMgr.remove(self.SchedulerSvc)
79 svcMgr += theSched
80 self.SchedulerSvc = theSched
81
82#
83## change the output level
84 def OutputLevel(self,level) :
85 self.SchedulerSvc.OutputLevel = level
86 self.PrecedenceSvc.OutputLevel = level
87
88#
89## control checking of data deps at beginning of job for unmet input deps
90 def CheckDependencies(self,check=True):
91 if ( 'CheckDependencies' in self.SchedulerSvc.properties() ):
92 self.SchedulerSvc.CheckDependencies = check
93 else :
94 self.log.warning( self.SchedulerSvc.getFullName() + " has no property \"CheckDependencies\"")
95
96#
97## control printout of control flow at beginning of job
98 def ShowControlFlow(self,show=True):
99 if ( 'ShowControlFlow' in self.SchedulerSvc.properties() ):
100 self.SchedulerSvc.ShowControlFlow = show
101 else :
102 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowControlFlow\"")
103
104#
105## control printout of data flow at beginning of job
106 def ShowDataFlow(self,show=True):
107 if ( 'ShowDataFlow' in self.SchedulerSvc.properties() ):
108 self.SchedulerSvc.ShowDataFlow = show
109 else :
110 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataFlow\"")
111
112#
113## control printout of data dependencies at beginning of job
114 def ShowDataDependencies(self,show=True):
115 if ( 'ShowDataDependencies' in self.SchedulerSvc.properties() ):
116 self.SchedulerSvc.ShowDataDependencies = show
117 else :
118 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataDependencies\"")
119
120#
121## set the DataLoader Algorithm to handle unmet input data deps
122 def setDataLoaderAlg(self,dataLoadAlg):
123 if ( 'DataLoaderAlg' in self.SchedulerSvc.properties() ):
124 self.SchedulerSvc.DataLoaderAlg = dataLoadAlg
125 else :
126 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"DataLoaderAlg\"")
127
128#
129## enable condition handling
130 def EnableConditions(self,enable=True):
131 if ( 'EnableConditions' in self.SchedulerSvc.properties() ):
132 self.SchedulerSvc.EnableConditions = enable
133 else :
134 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"EnableConditions\"")
135
136#
137## enable verbose view state logging
138 def EnableVerboseViews(self,enable=True):
139 if ( 'VerboseSubSlots' in self.SchedulerSvc.properties() ):
140 self.SchedulerSvc.VerboseSubSlots = enable
141 else :
142 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"VerboseSubSlots\"")
143
144#
145## set algorithm ranking rule
146 def setAlgRanking(self,rule="PCE"):
147 if ( 'Optimizer' in self.SchedulerSvc.properties() ):
148 self.SchedulerSvc.Optimizer = rule
149 else :
150 self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"Optimizer\"")
151 if ( 'TaskPriorityRule' in self.PrecedenceSvc.properties() ):
152 self.PrecedenceSvc.TaskPriorityRule = rule
153 else :
154 self.log.warning(self.PrecedenceSvc.getFullName() + " has no property \"TaskPriorityRule\"")
155
156#
157## explicitly set the thread pool size
158 def setThreadPoolSize(self,tps) :
159 self.SchedulerSvc.ThreadPoolSize = tps
160
161#
162## get the currently configured scheduler
163 def getScheduler(self):
164 """Get the Scheduler"""
165 return self.SchedulerSvc
166
167AlgScheduler = AlgScheduler()
__init__(self, theSched=None, thePrec=None)