|
def | __init__ (self, **params) |
|
def | submitJob (self, jobConfig) |
|
def | setParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False) |
|
def | appendParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False, endOfLine='\n') |
|
def | getParam (self, name) |
|
def | isSpecialParam (self, name) |
|
def | registerToBeCopied (self, paramName) |
|
def | checkParams (self) |
|
def | showParams (self, maxLineLength=80) |
|
def | dumpParams (self, format='%(name) -20s=%(value) s', maxLineLength=0) |
|
def | addFiles (self, fileList) |
|
def | addFilesToPoolFileCatalog (self, fileList) |
|
def | configureJob (self, jobnr, inputList=[]) |
|
def | configure (self) |
|
def | runJob (self, jobnr) |
|
def | run (self, jobnr=None) |
|
def | getNJobs (self) |
|
def | getRunningJobs (self) |
|
def | wait (self) |
|
def | getOutputFiles (self) |
|
def | log (self, subject, body, **data) |
|
PDSFJobRunner - run jobs using the SGE batch system on PDSF
Definition at line 36 of file PDSFJobRunner.py.
◆ __init__()
def python.PDSFJobRunner.PDSFJobRunner.__init__ |
( |
|
self, |
|
|
** |
params |
|
) |
| |
Constructor (takes any number of parameters as an argument).
Reimplemented from python.JobRunner.JobRunner.
Definition at line 39 of file PDSFJobRunner.py.
40 """Constructor (takes any number of parameters as an argument)."""
41 JobRunner.__init__(self)
44 self.setParam(
'batchqueue',
'all.64bit.q',
'Batch queue')
45 script = self.getParam(
'script')
46 self.setParam(
'script',preScript+script)
47 for k
in params.keys():
48 self.setParam(k,params[k])
50 self.setParam(
'rundir',
'%(jobdir)s')
51 self.setParam(
'maketmpdir',
'')
◆ addFiles()
def python.JobRunner.JobRunner.addFiles |
( |
|
self, |
|
|
|
fileList |
|
) |
| |
|
inherited |
Add a list of input files to be processed.
Definition at line 248 of file JobRunner.py.
248 def addFiles(self,fileList):
249 """ Add a list of input files to be processed."""
250 self.getParam(
'inputfiles').
extend(fileList)
◆ addFilesToPoolFileCatalog()
def python.JobRunner.JobRunner.addFilesToPoolFileCatalog |
( |
|
self, |
|
|
|
fileList |
|
) |
| |
|
inherited |
Add a list of files to be inserted to the POOL file catalog.
Definition at line 253 of file JobRunner.py.
253 def addFilesToPoolFileCatalog(self,fileList):
254 """Add a list of files to be inserted to the POOL file catalog."""
255 self.getParam(
'filesforpoolcatalog').
extend(fileList)
◆ appendParam()
def python.JobRunner.JobRunner.appendParam |
( |
|
self, |
|
|
|
name, |
|
|
|
value = None , |
|
|
|
description = None , |
|
|
|
isSpecial = None , |
|
|
|
insertAtFront = False , |
|
|
|
endOfLine = '\n' |
|
) |
| |
|
inherited |
Append to the value of a job parameter. If the parameter doesn't exist yet,
setParam is called to create it. If it does exist, only the value is updated
and the description and flag arguments are ignored.
Definition at line 178 of file JobRunner.py.
178 def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
179 """Append to the value of a job parameter. If the parameter doesn't exist yet,
180 setParam is called to create it. If it does exist, only the value is updated
181 and the description and flag arguments are ignored."""
182 if name
in self.params:
183 if value
is not None:
184 p = self.params[name]
185 if isinstance(p.value,str)
and p.value:
186 p.value = p.value + endOfLine + value
188 p.value = p.value + value
190 self.setParam(name,value,description,isSpecial,insertAtFront)
◆ checkParams()
def python.JobRunner.JobRunner.checkParams |
( |
|
self | ) |
|
|
inherited |
A generic routine for checking job parameters. Check that all parameters
can be evaluated. May be overrridden by a subclass if desired.
Definition at line 210 of file JobRunner.py.
210 def checkParams(self):
211 """A generic routine for checking job parameters. Check that all parameters
212 can be evaluated. May be overrridden by a subclass if desired."""
213 if len(self.paramOrder)!=len(self.params):
214 raise JobRunnerError (
'Inconsistent parameter definition')
217 for p
in self.paramOrder:
219 value = self.getParam(p)
220 if isinstance(value,str):
225 raise JobRunnerError (
'Unable to evaluate parameter: '+p+
' = '+value+
' (check parameter order)')
◆ configure()
def python.JobRunner.JobRunner.configure |
( |
|
self | ) |
|
|
inherited |
Configure all jobs.
Definition at line 375 of file JobRunner.py.
376 """Configure all jobs."""
377 if self.getParam(
'filesperjob')==0
or not self.getParam(
'inputfiles'):
378 raise JobRunnerError (
"No input files or illegal parameter 'filesperjob'")
379 lbperjob = self.getParam(
'lbperjob')
383 raise JobRunnerError (
'Negative number of luminosity blocks per job not allowed')
384 inputfiles = self.getParam(
'inputfiles')
387 lbpattern = re.compile(
r'lb(\d+)')
389 lbnrs = lbpattern.findall(f)
392 raise JobRunnerError (
'Unable to determine luminosity block number of file %s' % f)
395 raise JobRunnerError (
'Too many luminosity block numbers in filename %s' % f)
399 lbnrmax =
int(lbnrs[1])
403 while (lbnr <= lbnrmax) :
405 jobId =
int((lbnr-1)/lbperjob)
407 if jobId
not in jobInputDict:
408 jobInputDict[jobId] = [f]
409 jobLBDict[jobId] = [lbnr]
411 if f
not in jobInputDict[jobId] :
412 jobInputDict[jobId].
append(f)
413 if lbnr
not in jobLBDict[jobId] :
414 jobLBDict[jobId].
append(lbnr)
418 maxJobs = self.getParam(
'maxjobs')
420 maxJobs = len(jobInputDict)
421 for i
in sorted(jobInputDict.keys())[:maxJobs]:
426 self.setParam(
'lbList', jobLBDict[i])
427 self.configureJob(jobnr,jobInputDict[i])
431 njobs =
int(math.ceil(
float(len(self.getParam(
'inputfiles')))/self.getParam(
'filesperjob')))
432 if self.getParam(
'maxjobs'):
433 njobs =
min(self.getParam(
'maxjobs'),njobs)
434 print (
'\nConfiguring %i job(s) ...' % (njobs))
435 for i
in range(njobs):
◆ configureJob()
def python.JobRunner.JobRunner.configureJob |
( |
|
self, |
|
|
|
jobnr, |
|
|
|
inputList = [] |
|
) |
| |
|
inherited |
Configure parameters for a single job and write job configuration files. If inputLIst
is given, this is the list of input files used for the job.
Definition at line 258 of file JobRunner.py.
258 def configureJob(self,jobnr,inputList=[]):
259 """Configure parameters for a single job and write job configuration files. If inputLIst
260 is given, this is the list of input files used for the job."""
261 if jobnr
in self.jobs:
262 raise JobRunnerError (
'Job number %s already configured' % jobnr)
265 jobConfig[
'jobnr'] = jobnr
266 self.setParam(
'jobnr',jobnr)
270 jobConfig[
'inputfiles'] = inputList
272 inputfiles = self.getParam(
'inputfiles')
273 iFirst = self.getParam(
'filesperjob')*jobnr
274 if iFirst >= len(inputfiles):
275 raise JobRunnerError (
'Jobnr = %i too high for available number of files' % jobnr)
276 iLast = iFirst + self.getParam(
'filesperjob')
277 if iLast > len(inputfiles):
278 iLast=len(inputfiles)
279 jobConfig[
'inputfiles'] = inputfiles[iFirst:iLast]
282 for p
in self.paramOrder:
283 if self.isSpecialParam(p):
continue
284 value = self.getParam(p)
285 if isinstance(value,str):
286 jobConfig[p] = value % jobConfig
291 if jobConfig[
'setuprelease']:
293 jobConfig[
'cmdsetup'] = self.getParam(
'cmdsetup') % jobConfig
295 if not jobConfig[
'cmdsetup']:
296 jobConfig[
'cmdsetup'] = os.getenv(
'CMDSETUP',
'source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
298 jobConfig[
'cmdsetup'] =
''
302 jobConfig[
'cmdcopyfiles'] = self.getParam(
'cmdcopyfiles') % jobConfig
303 if not jobConfig[
'cmdcopyfiles']:
305 for p
in self.paramToBeCopied:
306 if not jobConfig[p]:
continue
307 filestobecopied =
' '.
join([filestobecopied,jobConfig[p]])
308 jobConfig[p] = os.path.basename(jobConfig[p])
310 jobConfig[
'cmdcopyfiles'] =
'cp -p '+filestobecopied+
' .'
314 jobConfig[
'cmddefinepoolcatalog'] = self.getParam(
'cmddefinepoolcatalog') % jobConfig
315 if not jobConfig[
'cmddefinepoolcatalog']:
316 if jobConfig[
'addinputtopoolcatalog']:
317 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']+jobConfig[
'inputfiles']) % jobConfig
319 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']) % jobConfig
321 jobConfig[
'cmddefinepoolcatalog'] =
'pool_insertFileToCatalog '+argstring
324 if jobConfig[
'returnstatuscode']:
325 jobConfig[
'cmdexit'] = self.getParam(
'cmdexit') % jobConfig
326 if not jobConfig[
'cmdexit']:
327 jobConfig[
'cmdexit'] =
'exit $status' % jobConfig
329 jobConfig[
'cmdexit'] =
''
332 jobConfig[
'outputfilestring'] =
','.
join([jobConfig[
'outputfileprefix']+f
for f
in jobConfig[
'outputfilelist']])
335 if jobConfig[
'taskpostprocsteps']:
336 jobConfig[
'doneflag'] = jobConfig[
'postprocflag']
338 jobConfig[
'doneflag'] = jobConfig[
'completedflag']
339 jobConfig[
'script'] = self.getParam(
'script') % jobConfig
344 for f
in jobConfig[
'outputfilelist']:
345 if os.access(jobConfig[
'outputfileprefix']+f,os.F_OK):
346 raise JobRunnerError (
'Job output file %s exists already' % jobConfig[f])
347 for f
in (
'configfile',
'scriptfile',
'logfile'):
348 if os.access(jobConfig[f],os.F_OK):
349 raise JobRunnerError (
'Job configuration or log file %s exists already' % jobConfig[f])
352 os.makedirs(
'%(jobdir)s' % jobConfig)
355 config =
open(jobConfig[
'configfile'],
'w')
356 config.write(
'# Job configuration data for job %(jobname)s\n' % jobConfig)
357 config.write(
'# Generated by JobRunner.py\n\n')
358 if pprint.isreadable(jobConfig):
359 config.write(
'jobConfig = '+pprint.pformat(jobConfig))
361 config.write(
'jobConfig = '+
repr(jobConfig))
366 script =
open(jobConfig[
'scriptfile'],
'w')
367 script.write(jobConfig[
'script'])
369 os.chmod(jobConfig[
'scriptfile'],0o755)
372 self.jobs[jobnr] = jobConfig
◆ dumpParams()
def python.JobRunner.JobRunner.dumpParams |
( |
|
self, |
|
|
|
format = '%(name)-20s = %(value)s' , |
|
|
|
maxLineLength = 0 |
|
) |
| |
|
inherited |
Dump all parameters into a string in a user-specified format.
Definition at line 237 of file JobRunner.py.
237 def dumpParams(self,format='%(name)-20s = %(value)s
',maxLineLength=0): """Dump all parameters into a string in a user-specified format."""
239 for p
in self.paramOrder:
240 s = format % self.params[p].__dict__
242 s = s[0:maxLineLength-3]+
'...'
◆ getNJobs()
def python.JobRunner.JobRunner.getNJobs |
( |
|
self | ) |
|
|
inherited |
◆ getOutputFiles()
def python.JobRunner.JobRunner.getOutputFiles |
( |
|
self | ) |
|
|
inherited |
Get a list of all output files.
Definition at line 504 of file JobRunner.py.
504 def getOutputFiles(self):
505 """Get a list of all output files."""
508 for f
in sorted(glob.glob(self.jobs[j][
'outputfileprefix']+
'*')):
509 outputFiles.append(f)
◆ getParam()
def python.JobRunner.JobRunner.getParam |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
inherited |
Get value of a parameter by name.
Definition at line 193 of file JobRunner.py.
193 def getParam(self,name):
194 """Get value of a parameter by name."""
195 return self.params[name].value
◆ getRunningJobs()
def python.JobRunner.JobRunner.getRunningJobs |
( |
|
self | ) |
|
|
inherited |
Get list of jobs submitted by this JobRunner that are still running
Definition at line 484 of file JobRunner.py.
484 def getRunningJobs(self):
485 """Get list of jobs submitted by this JobRunner that are still running"""
488 if os.path.exists(self.jobs[j][
'subflag'])
or os.path.exists(self.jobs[j][
'runflag']):
489 runningJobs.append(self.jobs[j][
'jobname'])
◆ isSpecialParam()
def python.JobRunner.JobRunner.isSpecialParam |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
inherited |
Check if parameter requires special treatment.
Definition at line 198 of file JobRunner.py.
198 def isSpecialParam(self,name):
199 """Check if parameter requires special treatment."""
200 return self.params[name].isSpecial
◆ log()
def python.JobRunner.JobRunner.log |
( |
|
self, |
|
|
|
subject, |
|
|
|
body, |
|
|
** |
data |
|
) |
| |
|
inherited |
Send a log message to a set of users specified by parameter 'logmail'.
Definition at line 513 of file JobRunner.py.
513 def log(self,subject,body,**data):
514 """Send a log message to a set of users specified by parameter 'logmail'."""
515 if self.getParam(
'logmail'):
516 msg =
'Generated by JobRunner on host '+socket.gethostname()+
' by '+pwd.getpwuid(os.getuid())[0]+
' at '+time.asctime()
517 msg +=
'\n\nCurrent working directory = '+os.getcwd()
518 if body: msg +=
'\n\n'+body
519 for k
in data.keys():
520 if isinstance(data[k],dict):
522 items = data[k].
keys()
525 msg +=
"\n %-20s = %s" % (i,data[k][i])
527 msg +=
"\n\n%-20s = %s" % (k,data[k])
528 subprocess.call(
"echo '%s' | mail -s '%s' '%s'" %
529 (msg,subject,self.getParam(
'logmail')), shell=
True)
◆ registerToBeCopied()
def python.JobRunner.JobRunner.registerToBeCopied |
( |
|
self, |
|
|
|
paramName |
|
) |
| |
|
inherited |
Register a parameter holding the name of an input file to be
copied before a job starts.
Definition at line 203 of file JobRunner.py.
203 def registerToBeCopied(self,paramName):
204 """Register a parameter holding the name of an input file to be
205 copied before a job starts."""
206 if paramName
not in self.paramToBeCopied:
207 self.paramToBeCopied.
append(paramName)
◆ run()
def python.JobRunner.JobRunner.run |
( |
|
self, |
|
|
|
jobnr = None |
|
) |
| |
|
inherited |
Run all configured job(s)
Definition at line 466 of file JobRunner.py.
466 def run(self,jobnr=None):
467 """Run all configured job(s)"""
469 raise JobRunnerError (
'No configured jobs')
470 njobs = len(self.jobs)
471 self.log(
'JobRunner: starting '+
str(njobs)+
' jobs',self.dumpParams())
473 print (
'\nStarting job %i (using %s) ...\n' % (j,self.__class__))
◆ runJob()
def python.JobRunner.JobRunner.runJob |
( |
|
self, |
|
|
|
jobnr |
|
) |
| |
|
inherited |
Run a single configured job.
Definition at line 456 of file JobRunner.py.
456 def runJob(self,jobnr):
457 """Run a single configured job."""
458 if jobnr
not in self.jobs:
459 raise JobRunnerError (
'Job number %s is not configured' % jobnr)
460 jobConfig = self.jobs[jobnr]
461 subprocess.call(
'touch '+jobConfig[
'subflag'], shell=
True)
462 status = self.submitJob(jobConfig)
463 self.jobStatus[jobnr] = status
◆ setParam()
def python.JobRunner.JobRunner.setParam |
( |
|
self, |
|
|
|
name, |
|
|
|
value = None , |
|
|
|
description = None , |
|
|
|
isSpecial = None , |
|
|
|
insertAtFront = False |
|
) |
| |
|
inherited |
Set the value of a job parameter. Job parameters may be templates that name
previously defined job parameters and will be evaluated when a given job
is configured. Therefore the order in which setParam is called for the different
parameters is relevant. insertAtFront can be set to True to force early
evaluation of a given parameter.
Definition at line 158 of file JobRunner.py.
158 def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
159 """Set the value of a job parameter. Job parameters may be templates that name
160 previously defined job parameters and will be evaluated when a given job
161 is configured. Therefore the order in which setParam is called for the different
162 parameters is relevant. insertAtFront can be set to True to force early
163 evaluation of a given parameter."""
164 p = self.params[name] = self.params.
get(name,JobRunnerParameter(name))
165 if value
is not None:
167 if description
is not None:
168 p.description = description
169 if isSpecial
is not None:
170 p.isSpecial=isSpecial
171 if name
not in self.paramOrder:
173 self.paramOrder.insert(0,name)
175 self.paramOrder.
append(name)
◆ showParams()
def python.JobRunner.JobRunner.showParams |
( |
|
self, |
|
|
|
maxLineLength = 80 |
|
) |
| |
|
inherited |
Show current job parameters.
Definition at line 228 of file JobRunner.py.
228 def showParams(self,maxLineLength=80):
229 """Show current job parameters."""
230 for p
in self.paramOrder:
231 s =
str(self.params[p])
232 if maxLineLength > 0
and len(s) > maxLineLength:
233 s = s[0:maxLineLength-3] +
'...'
◆ submitJob()
def python.PDSFJobRunner.PDSFJobRunner.submitJob |
( |
|
self, |
|
|
|
jobConfig |
|
) |
| |
Submit a JobRunner job as a SGE batch job.
Reimplemented from python.JobRunner.JobRunner.
Definition at line 54 of file PDSFJobRunner.py.
54 def submitJob(self,jobConfig):
55 """Submit a JobRunner job as a SGE batch job."""
57 batchCmd =
'qsub -N %(jobname)s -j y -o %(logfile)s %(scriptfile)s' % jobConfig
◆ wait()
def python.JobRunner.JobRunner.wait |
( |
|
self | ) |
|
|
inherited |
Wait until all jobs have completed
Definition at line 493 of file JobRunner.py.
494 """Wait until all jobs have completed"""
497 runningJobs = self.getRunningJobs()
498 if not runningJobs:
break
500 print (time.asctime(),
' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
501 self.log(
'JobRunner: finished',self.dumpParams(),outputFiles=self.getOutputFiles())
◆ jobs
python.JobRunner.JobRunner.jobs |
|
inherited |
◆ jobStatus
python.JobRunner.JobRunner.jobStatus |
|
inherited |
◆ paramOrder
python.JobRunner.JobRunner.paramOrder |
|
inherited |
◆ params
python.JobRunner.JobRunner.params |
|
inherited |
◆ paramToBeCopied
python.JobRunner.JobRunner.paramToBeCopied |
|
inherited |
The documentation for this class was generated from the following file:
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)