ATLAS Offline Software
Loading...
Searching...
No Matches
Control
AthenaCommon
python
AlgScheduler.py
Go to the documentation of this file.
1
# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3
# Configuration for the Hive Algorithm Scheduler.
4
#
5
# Allows easy low level replacement of the specific Scheduler, without
6
# requiring clients to know which one is in use.
7
#
8
# the AlgScheduler will be setup with the same number of threads
9
# as are specified on the command line with the "--threads=N" parameter
10
#
11
# usage:
12
# from AthenaCommon.AlgScheduler import AlgScheduler
13
# clients can then configure runtime printouts, threadpool, etc:
14
# AlgScheduler.ShowDataDependencies( True )
15
# AlgScheduler.ShowControlFlow( True )
16
# AlgScheduler.ShowDataFlow( True )
17
# AlgScheduler.setThreadPoolSize( 7 )
18
#
19
# if a specific scheduler lacks that option, a warning message is printed
20
# clients can also replace the default scheduler with another one
21
# from GaudiHive.GaudiHiveConf import ForwardSchedulerSvc
22
# myScheduler = ForwardSchedulerSvc()
23
# AlgScheduler.SetScheduler( myScheduler )
24
# AlgScheduler.setThreadPoolSize( 7 )
25
#
26
# if this is done, the HiveEventLoopMgr also needs to know about it
27
# from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr
28
# svcMgr.AthenaHiveEventLoopMgr.SchedulerSvc = AlgScheduler.getScheduler().getName()
29
#
30
31
32
class
AlgScheduler
:
33
def
__init__
(self,theSched=None,thePrec=None):
34
"""Setup Algorithm Scheduler"""
35
36
from AthenaCommon.AppMgr import ServiceMgr as svcMgr
37
from AthenaCommon.Constants import INFO
38
39
from AthenaCommon.ConcurrencyFlags import jobproperties as jps
40
from AthenaCommon.Logging import logging
41
42
self.log = logging.getLogger( 'AlgScheduler' )
43
44
if (theSched is None) :
45
from GaudiHive.GaudiHiveConf import AvalancheSchedulerSvc
46
svcMgr += AvalancheSchedulerSvc()
47
self.SchedulerSvc = svcMgr.AvalancheSchedulerSvc
48
else :
49
svcMgr += theSched
50
self.SchedulerSvc = theSched
51
52
if (thePrec is None) :
53
from GaudiHive.GaudiHiveConf import PrecedenceSvc
54
svcMgr += PrecedenceSvc()
55
self.PrecedenceSvc = svcMgr.PrecedenceSvc
56
else :
57
svcMgr += thePrec
58
self.PrecedenceSvc = thePrec
59
60
self.SchedulerSvc.OutputLevel = INFO
61
self.PrecedenceSvc.OutputLevel = INFO
62
self.SchedulerSvc.CheckDependencies = True
63
self.SchedulerSvc.ThreadPoolSize = jps.ConcurrencyFlags.NumThreads()
64
self.SchedulerSvc.NumOffloadThreads = 0 # we don't support configuring this in legacy job options
65
66
self.log.info("setting up " + self.SchedulerSvc.getFullName() + " with " + str(jps.ConcurrencyFlags.NumThreads()) + " threads")
67
68
69
#
70
## exchange the current scheduler for another one
71
def SetScheduler(self,theSched):
72
"""
setup a different Schedule
r"""
73
74
if (self.SchedulerSvc.getFullName() != theSched.getFullName()) :
75
self.log.info("replacing " + self.SchedulerSvc.getFullName()
76
+ " with " + theSched.getFullName())
77
from AthenaCommon.AppMgr import ServiceMgr as svcMgr
78
svcMgr.remove(self.SchedulerSvc)
79
svcMgr += theSched
80
self.SchedulerSvc = theSched
81
82
#
83
## change the output level
84
def OutputLevel(self,level) :
85
self.SchedulerSvc.OutputLevel = level
86
self.PrecedenceSvc.OutputLevel = level
87
88
#
89
## control checking of data deps at beginning of job for unmet input deps
90
def CheckDependencies(self,check=True):
91
if ( 'CheckDependencies' in self.SchedulerSvc.properties() ):
92
self.SchedulerSvc.CheckDependencies = check
93
else :
94
self.log.warning( self.SchedulerSvc.getFullName() + " has no property \"CheckDependencies\"")
95
96
#
97
## control printout of control flow at beginning of job
98
def ShowControlFlow(self,show=True):
99
if ( 'ShowControlFlow' in self.SchedulerSvc.properties() ):
100
self.SchedulerSvc.ShowControlFlow = show
101
else :
102
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowControlFlow\"")
103
104
#
105
## control printout of data flow at beginning of job
106
def ShowDataFlow(self,show=True):
107
if ( 'ShowDataFlow' in self.SchedulerSvc.properties() ):
108
self.SchedulerSvc.ShowDataFlow = show
109
else :
110
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataFlow\"")
111
112
#
113
## control printout of data dependencies at beginning of job
114
def ShowDataDependencies(self,show=True):
115
if ( 'ShowDataDependencies' in self.SchedulerSvc.properties() ):
116
self.SchedulerSvc.ShowDataDependencies = show
117
else :
118
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataDependencies\"")
119
120
#
121
## set the DataLoader Algorithm to handle unmet input data deps
122
def setDataLoaderAlg(self,dataLoadAlg):
123
if ( 'DataLoaderAlg' in self.SchedulerSvc.properties() ):
124
self.SchedulerSvc.DataLoaderAlg = dataLoadAlg
125
else :
126
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"DataLoaderAlg\"")
127
128
#
129
## enable condition handling
130
def EnableConditions(self,enable=True):
131
if ( 'EnableConditions' in self.SchedulerSvc.properties() ):
132
self.SchedulerSvc.EnableConditions = enable
133
else :
134
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"EnableConditions\"")
135
136
#
137
## enable verbose view state logging
138
def EnableVerboseViews(self,enable=True):
139
if ( 'VerboseSubSlots' in self.SchedulerSvc.properties() ):
140
self.SchedulerSvc.VerboseSubSlots = enable
141
else :
142
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"VerboseSubSlots\"")
143
144
#
145
## set algorithm ranking rule
146
def setAlgRanking(self,rule="PCE"):
147
if ( 'Optimizer' in self.SchedulerSvc.properties() ):
148
self.SchedulerSvc.Optimizer = rule
149
else :
150
self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"Optimizer\"")
151
if ( 'TaskPriorityRule' in self.PrecedenceSvc.properties() ):
152
self.PrecedenceSvc.TaskPriorityRule = rule
153
else :
154
self.log.warning(self.PrecedenceSvc.getFullName() + " has no property \"TaskPriorityRule\"")
155
156
#
157
## explicitly set the thread pool size
158
def setThreadPoolSize(self,tps) :
159
self.SchedulerSvc.ThreadPoolSize = tps
160
161
#
162
## get the currently configured scheduler
163
def getScheduler(self):
164
"""
Get the Schedule
r"""
165
return self.SchedulerSvc
166
167
AlgScheduler = AlgScheduler()
python.AlgScheduler.AlgScheduler
Definition
AlgScheduler.py:32
python.AlgScheduler.AlgScheduler.__init__
__init__(self, theSched=None, thePrec=None)
Definition
AlgScheduler.py:33
Generated on
for ATLAS Offline Software by
1.14.0