|
def | __init__ (self, **params) |
|
def | submitJob (self, jobConfig) |
|
def | setParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False) |
|
def | appendParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False, endOfLine='\n') |
|
def | getParam (self, name) |
|
def | isSpecialParam (self, name) |
|
def | registerToBeCopied (self, paramName) |
|
def | checkParams (self) |
|
def | showParams (self, maxLineLength=80) |
|
def | dumpParams (self, format='%(name) -20s=%(value) s', maxLineLength=0) |
|
def | addFiles (self, fileList) |
|
def | addFilesToPoolFileCatalog (self, fileList) |
|
def | configureJob (self, jobnr, inputList=[]) |
|
def | configure (self) |
|
def | runJob (self, jobnr) |
|
def | run (self, jobnr=None) |
|
def | getNJobs (self) |
|
def | getRunningJobs (self) |
|
def | wait (self) |
|
def | getOutputFiles (self) |
|
def | log (self, subject, body, **data) |
|
ShellJobRunner - runs jobs one at a time in current shell
Definition at line 17 of file ShellJobRunner.py.
◆ __init__()
def python.ShellJobRunner.ShellJobRunner.__init__ |
( |
|
self, |
|
|
** |
params |
|
) |
| |
Constructor (takes any number of parameters as an argument).
Reimplemented from python.JobRunner.JobRunner.
Definition at line 20 of file ShellJobRunner.py.
21 """Constructor (takes any number of parameters as an argument)."""
22 JobRunner.__init__(self,**params)
◆ addFiles()
def python.JobRunner.JobRunner.addFiles |
( |
|
self, |
|
|
|
fileList |
|
) |
| |
|
inherited |
Add a list of input files to be processed.
Definition at line 247 of file JobRunner.py.
247 def addFiles(self,fileList):
248 """ Add a list of input files to be processed."""
249 self.getParam(
'inputfiles').
extend(fileList)
◆ addFilesToPoolFileCatalog()
def python.JobRunner.JobRunner.addFilesToPoolFileCatalog |
( |
|
self, |
|
|
|
fileList |
|
) |
| |
|
inherited |
Add a list of files to be inserted to the POOL file catalog.
Definition at line 252 of file JobRunner.py.
252 def addFilesToPoolFileCatalog(self,fileList):
253 """Add a list of files to be inserted to the POOL file catalog."""
254 self.getParam(
'filesforpoolcatalog').
extend(fileList)
◆ appendParam()
def python.JobRunner.JobRunner.appendParam |
( |
|
self, |
|
|
|
name, |
|
|
|
value = None , |
|
|
|
description = None , |
|
|
|
isSpecial = None , |
|
|
|
insertAtFront = False , |
|
|
|
endOfLine = '\n' |
|
) |
| |
|
inherited |
Append to the value of a job parameter. If the parameter doesn't exist yet,
setParam is called to create it. If it does exist, only the value is updated
and the description and flag arguments are ignored.
Definition at line 177 of file JobRunner.py.
177 def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
178 """Append to the value of a job parameter. If the parameter doesn't exist yet,
179 setParam is called to create it. If it does exist, only the value is updated
180 and the description and flag arguments are ignored."""
181 if name
in self.params:
182 if value
is not None:
183 p = self.params[name]
184 if isinstance(p.value,str)
and p.value:
185 p.value = p.value + endOfLine + value
187 p.value = p.value + value
189 self.setParam(name,value,description,isSpecial,insertAtFront)
◆ checkParams()
def python.JobRunner.JobRunner.checkParams |
( |
|
self | ) |
|
|
inherited |
A generic routine for checking job parameters. Check that all parameters
can be evaluated. May be overrridden by a subclass if desired.
Definition at line 209 of file JobRunner.py.
209 def checkParams(self):
210 """A generic routine for checking job parameters. Check that all parameters
211 can be evaluated. May be overrridden by a subclass if desired."""
212 if len(self.paramOrder)!=len(self.params):
213 raise JobRunnerError (
'Inconsistent parameter definition')
216 for p
in self.paramOrder:
218 value = self.getParam(p)
219 if isinstance(value,str):
224 raise JobRunnerError (
'Unable to evaluate parameter: '+p+
' = '+value+
' (check parameter order)')
◆ configure()
def python.JobRunner.JobRunner.configure |
( |
|
self | ) |
|
|
inherited |
Configure all jobs.
Definition at line 374 of file JobRunner.py.
375 """Configure all jobs."""
376 if self.getParam(
'filesperjob')==0
or not self.getParam(
'inputfiles'):
377 raise JobRunnerError (
"No input files or illegal parameter 'filesperjob'")
378 lbperjob = self.getParam(
'lbperjob')
382 raise JobRunnerError (
'Negative number of luminosity blocks per job not allowed')
383 inputfiles = self.getParam(
'inputfiles')
386 lbpattern = re.compile(
r'lb(\d+)')
388 lbnrs = lbpattern.findall(f)
391 raise JobRunnerError (
'Unable to determine luminosity block number of file %s' % f)
394 raise JobRunnerError (
'Too many luminosity block numbers in filename %s' % f)
398 lbnrmax =
int(lbnrs[1])
402 while (lbnr <= lbnrmax) :
404 jobId =
int((lbnr-1)/lbperjob)
406 if jobId
not in jobInputDict:
407 jobInputDict[jobId] = [f]
408 jobLBDict[jobId] = [lbnr]
410 if f
not in jobInputDict[jobId] :
411 jobInputDict[jobId].
append(f)
412 if lbnr
not in jobLBDict[jobId] :
413 jobLBDict[jobId].
append(lbnr)
417 maxJobs = self.getParam(
'maxjobs')
419 maxJobs = len(jobInputDict)
420 for i
in sorted(jobInputDict.keys())[:maxJobs]:
425 self.setParam(
'lbList', jobLBDict[i])
426 self.configureJob(jobnr,jobInputDict[i])
430 njobs =
int(math.ceil(
float(len(self.getParam(
'inputfiles')))/self.getParam(
'filesperjob')))
431 if self.getParam(
'maxjobs'):
432 njobs =
min(self.getParam(
'maxjobs'),njobs)
433 print (
'\nConfiguring %i job(s) ...' % (njobs))
434 for i
in range(njobs):
◆ configureJob()
def python.JobRunner.JobRunner.configureJob |
( |
|
self, |
|
|
|
jobnr, |
|
|
|
inputList = [] |
|
) |
| |
|
inherited |
Configure parameters for a single job and write job configuration files. If inputLIst
is given, this is the list of input files used for the job.
Definition at line 257 of file JobRunner.py.
257 def configureJob(self,jobnr,inputList=[]):
258 """Configure parameters for a single job and write job configuration files. If inputLIst
259 is given, this is the list of input files used for the job."""
260 if jobnr
in self.jobs:
261 raise JobRunnerError (
'Job number %s already configured' % jobnr)
264 jobConfig[
'jobnr'] = jobnr
265 self.setParam(
'jobnr',jobnr)
269 jobConfig[
'inputfiles'] = inputList
271 inputfiles = self.getParam(
'inputfiles')
272 iFirst = self.getParam(
'filesperjob')*jobnr
273 if iFirst >= len(inputfiles):
274 raise JobRunnerError (
'Jobnr = %i too high for available number of files' % jobnr)
275 iLast = iFirst + self.getParam(
'filesperjob')
276 if iLast > len(inputfiles):
277 iLast=len(inputfiles)
278 jobConfig[
'inputfiles'] = inputfiles[iFirst:iLast]
281 for p
in self.paramOrder:
282 if self.isSpecialParam(p):
continue
283 value = self.getParam(p)
284 if isinstance(value,str):
285 jobConfig[p] = value % jobConfig
290 if jobConfig[
'setuprelease']:
292 jobConfig[
'cmdsetup'] = self.getParam(
'cmdsetup') % jobConfig
294 if not jobConfig[
'cmdsetup']:
295 jobConfig[
'cmdsetup'] = os.getenv(
'CMDSETUP',
'source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
297 jobConfig[
'cmdsetup'] =
''
301 jobConfig[
'cmdcopyfiles'] = self.getParam(
'cmdcopyfiles') % jobConfig
302 if not jobConfig[
'cmdcopyfiles']:
304 for p
in self.paramToBeCopied:
305 if not jobConfig[p]:
continue
306 filestobecopied =
' '.
join([filestobecopied,jobConfig[p]])
307 jobConfig[p] = os.path.basename(jobConfig[p])
309 jobConfig[
'cmdcopyfiles'] =
'cp -p '+filestobecopied+
' .'
313 jobConfig[
'cmddefinepoolcatalog'] = self.getParam(
'cmddefinepoolcatalog') % jobConfig
314 if not jobConfig[
'cmddefinepoolcatalog']:
315 if jobConfig[
'addinputtopoolcatalog']:
316 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']+jobConfig[
'inputfiles']) % jobConfig
318 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']) % jobConfig
320 jobConfig[
'cmddefinepoolcatalog'] =
'pool_insertFileToCatalog '+argstring
323 if jobConfig[
'returnstatuscode']:
324 jobConfig[
'cmdexit'] = self.getParam(
'cmdexit') % jobConfig
325 if not jobConfig[
'cmdexit']:
326 jobConfig[
'cmdexit'] =
'exit $status' % jobConfig
328 jobConfig[
'cmdexit'] =
''
331 jobConfig[
'outputfilestring'] =
','.
join([jobConfig[
'outputfileprefix']+f
for f
in jobConfig[
'outputfilelist']])
334 if jobConfig[
'taskpostprocsteps']:
335 jobConfig[
'doneflag'] = jobConfig[
'postprocflag']
337 jobConfig[
'doneflag'] = jobConfig[
'completedflag']
338 jobConfig[
'script'] = self.getParam(
'script') % jobConfig
343 for f
in jobConfig[
'outputfilelist']:
344 if os.access(jobConfig[
'outputfileprefix']+f,os.F_OK):
345 raise JobRunnerError (
'Job output file %s exists already' % jobConfig[f])
346 for f
in (
'configfile',
'scriptfile',
'logfile'):
347 if os.access(jobConfig[f],os.F_OK):
348 raise JobRunnerError (
'Job configuration or log file %s exists already' % jobConfig[f])
351 os.makedirs(
'%(jobdir)s' % jobConfig)
354 config =
open(jobConfig[
'configfile'],
'w')
355 config.write(
'# Job configuration data for job %(jobname)s\n' % jobConfig)
356 config.write(
'# Generated by JobRunner.py\n\n')
357 if pprint.isreadable(jobConfig):
358 config.write(
'jobConfig = '+pprint.pformat(jobConfig))
360 config.write(
'jobConfig = '+
repr(jobConfig))
365 script =
open(jobConfig[
'scriptfile'],
'w')
366 script.write(jobConfig[
'script'])
368 os.chmod(jobConfig[
'scriptfile'],0o755)
371 self.jobs[jobnr] = jobConfig
◆ dumpParams()
def python.JobRunner.JobRunner.dumpParams |
( |
|
self, |
|
|
|
format = '%(name)-20s = %(value)s' , |
|
|
|
maxLineLength = 0 |
|
) |
| |
|
inherited |
Dump all parameters into a string in a user-specified format.
Definition at line 236 of file JobRunner.py.
236 def dumpParams(self,format='%(name)-20s = %(value)s
',maxLineLength=0):
237 """Dump all parameters into a string in a user-specified format."""
239 for p
in self.paramOrder:
240 s = format % self.params[p].__dict__
242 s = s[0:maxLineLength-3]+
'...'
◆ getNJobs()
def python.JobRunner.JobRunner.getNJobs |
( |
|
self | ) |
|
|
inherited |
◆ getOutputFiles()
def python.JobRunner.JobRunner.getOutputFiles |
( |
|
self | ) |
|
|
inherited |
Get a list of all output files.
Definition at line 503 of file JobRunner.py.
503 def getOutputFiles(self):
504 """Get a list of all output files."""
507 for f
in sorted(glob.glob(self.jobs[j][
'outputfileprefix']+
'*')):
508 outputFiles.append(f)
◆ getParam()
def python.JobRunner.JobRunner.getParam |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
inherited |
Get value of a parameter by name.
Definition at line 192 of file JobRunner.py.
192 def getParam(self,name):
193 """Get value of a parameter by name."""
194 return self.params[name].value
◆ getRunningJobs()
def python.JobRunner.JobRunner.getRunningJobs |
( |
|
self | ) |
|
|
inherited |
Get list of jobs submitted by this JobRunner that are still running
Definition at line 483 of file JobRunner.py.
483 def getRunningJobs(self):
484 """Get list of jobs submitted by this JobRunner that are still running"""
487 if os.path.exists(self.jobs[j][
'subflag'])
or os.path.exists(self.jobs[j][
'runflag']):
488 runningJobs.append(self.jobs[j][
'jobname'])
◆ isSpecialParam()
def python.JobRunner.JobRunner.isSpecialParam |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
inherited |
Check if parameter requires special treatment.
Definition at line 197 of file JobRunner.py.
197 def isSpecialParam(self,name):
198 """Check if parameter requires special treatment."""
199 return self.params[name].isSpecial
◆ log()
def python.JobRunner.JobRunner.log |
( |
|
self, |
|
|
|
subject, |
|
|
|
body, |
|
|
** |
data |
|
) |
| |
|
inherited |
Send a log message to a set of users specified by parameter 'logmail'.
Definition at line 512 of file JobRunner.py.
512 def log(self,subject,body,**data):
513 """Send a log message to a set of users specified by parameter 'logmail'."""
514 if self.getParam(
'logmail'):
515 msg =
'Generated by JobRunner on host '+socket.gethostname()+
' by '+pwd.getpwuid(os.getuid())[0]+
' at '+time.asctime()
516 msg +=
'\n\nCurrent working directory = '+os.getcwd()
517 if body: msg +=
'\n\n'+body
518 for k
in data.keys():
519 if isinstance(data[k],dict):
521 items = data[k].
keys()
524 msg +=
"\n %-20s = %s" % (i,data[k][i])
526 msg +=
"\n\n%-20s = %s" % (k,data[k])
527 subprocess.call(
"echo '%s' | mail -s '%s' '%s'" %
528 (msg,subject,self.getParam(
'logmail')), shell=
True)
◆ registerToBeCopied()
def python.JobRunner.JobRunner.registerToBeCopied |
( |
|
self, |
|
|
|
paramName |
|
) |
| |
|
inherited |
Register a parameter holding the name of an input file to be
copied before a job starts.
Definition at line 202 of file JobRunner.py.
202 def registerToBeCopied(self,paramName):
203 """Register a parameter holding the name of an input file to be
204 copied before a job starts."""
205 if paramName
not in self.paramToBeCopied:
206 self.paramToBeCopied.
append(paramName)
◆ run()
def python.JobRunner.JobRunner.run |
( |
|
self, |
|
|
|
jobnr = None |
|
) |
| |
|
inherited |
Run all configured job(s)
Definition at line 465 of file JobRunner.py.
465 def run(self,jobnr=None):
466 """Run all configured job(s)"""
468 raise JobRunnerError (
'No configured jobs')
469 njobs = len(self.jobs)
470 self.log(
'JobRunner: starting '+
str(njobs)+
' jobs',self.dumpParams())
472 print (
'\nStarting job %i (using %s) ...\n' % (j,self.__class__))
◆ runJob()
def python.JobRunner.JobRunner.runJob |
( |
|
self, |
|
|
|
jobnr |
|
) |
| |
|
inherited |
Run a single configured job.
Definition at line 455 of file JobRunner.py.
455 def runJob(self,jobnr):
456 """Run a single configured job."""
457 if jobnr
not in self.jobs:
458 raise JobRunnerError (
'Job number %s is not configured' % jobnr)
459 jobConfig = self.jobs[jobnr]
460 subprocess.call(
'touch '+jobConfig[
'subflag'], shell=
True)
461 status = self.submitJob(jobConfig)
462 self.jobStatus[jobnr] = status
◆ setParam()
def python.JobRunner.JobRunner.setParam |
( |
|
self, |
|
|
|
name, |
|
|
|
value = None , |
|
|
|
description = None , |
|
|
|
isSpecial = None , |
|
|
|
insertAtFront = False |
|
) |
| |
|
inherited |
Set the value of a job parameter. Job parameters may be templates that name
previously defined job parameters and will be evaluated when a given job
is configured. Therefore the order in which setParam is called for the different
parameters is relevant. insertAtFront can be set to True to force early
evaluation of a given parameter.
Definition at line 157 of file JobRunner.py.
157 def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
158 """Set the value of a job parameter. Job parameters may be templates that name
159 previously defined job parameters and will be evaluated when a given job
160 is configured. Therefore the order in which setParam is called for the different
161 parameters is relevant. insertAtFront can be set to True to force early
162 evaluation of a given parameter."""
163 p = self.params[name] = self.params.
get(name,JobRunnerParameter(name))
164 if value
is not None:
166 if description
is not None:
167 p.description = description
168 if isSpecial
is not None:
169 p.isSpecial=isSpecial
170 if name
not in self.paramOrder:
172 self.paramOrder.insert(0,name)
174 self.paramOrder.
append(name)
◆ showParams()
def python.JobRunner.JobRunner.showParams |
( |
|
self, |
|
|
|
maxLineLength = 80 |
|
) |
| |
|
inherited |
Show current job parameters.
Definition at line 227 of file JobRunner.py.
227 def showParams(self,maxLineLength=80):
228 """Show current job parameters."""
229 for p
in self.paramOrder:
230 s =
str(self.params[p])
231 if maxLineLength > 0
and len(s) > maxLineLength:
232 s = s[0:maxLineLength-3] +
'...'
◆ submitJob()
def python.ShellJobRunner.ShellJobRunner.submitJob |
( |
|
self, |
|
|
|
jobConfig |
|
) |
| |
Run a JobRunner job in the current shell.
Reimplemented from python.JobRunner.JobRunner.
Definition at line 24 of file ShellJobRunner.py.
24 def submitJob(self,jobConfig):
25 """Run a JobRunner job in the current shell."""
26 scriptfile = jobConfig[
'scriptfile']
27 logfile = jobConfig[
'logfile']
28 status = os.system(scriptfile+
' > '+logfile+
' 2>&1') >> 8
◆ wait()
def python.JobRunner.JobRunner.wait |
( |
|
self | ) |
|
|
inherited |
Wait until all jobs have completed
Definition at line 492 of file JobRunner.py.
493 """Wait until all jobs have completed"""
496 runningJobs = self.getRunningJobs()
497 if not runningJobs:
break
499 print (time.asctime(),
' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
500 self.log(
'JobRunner: finished',self.dumpParams(),outputFiles=self.getOutputFiles())
◆ jobs
python.JobRunner.JobRunner.jobs |
|
inherited |
◆ jobStatus
python.JobRunner.JobRunner.jobStatus |
|
inherited |
◆ paramOrder
python.JobRunner.JobRunner.paramOrder |
|
inherited |
◆ params
python.JobRunner.JobRunner.params |
|
inherited |
◆ paramToBeCopied
python.JobRunner.JobRunner.paramToBeCopied |
|
inherited |
The documentation for this class was generated from the following file:
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)