7 JobRunner is a set of classes aimed at simplifying the running of
8 (athena) jobs on a set of input files. JobRunner provides each
9 job with its set of configuration parameters (using a Python
10 dictonary) and runs it over a sub-sample (or all) of the input files.
11 Jobs can run interactively, on different batch systems, or on the Grid.
13 Written by Juerg Beringer (LBNL) in 2008.
15 __author__ =
'Juerg Beringer'
16 __version__ =
'$Id: JobRunner.py 747883 2016-05-18 06:58:10Z amorley $'
30 scriptTemplate =
"""#!/bin/sh
33 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
34 %(cmdjobpreprocessing)s
38 echo "Preprocessing and copying files (`date`) ..."
39 # Redirect TMPDIR to avoid problems with concurrent jobs
41 export TMPDIR=`pwd`/tmp
44 echo "Creating POOL file catalog (`date`) ..."
45 %(cmddefinepoolcatalog)s
47 echo "Running athena (`date`) ..."
48 export PYTHONPATH="%(jobdir)s:$PYTHONPATH"
50 ln -fs %(configfile)s %(configmy)s
51 /usr/bin/time -p %(joboptionpath)s
54 echo $status > %(exitstatus)s
55 echo "Postprocessing (`date`) ..."
56 %(cmdjobpostprocessing)s
65 """JobRunner exception class"""
70 """JobRunner job parameter data"""
72 def __init__(self,name,value='',description='',isSpecial=False):
85 return "%-20s = %-20s" % (self.
name,self.
value)
88 version = os.getenv(
'AtlasVersion')
90 branch = os.getenv(
'AtlasBuildBranch')
91 stamp = os.getenv(
'AtlasBuildStamp')
92 version =
','.
join([branch, stamp])
93 project = os.getenv(
'AtlasProject')
94 platform = os.getenv(
'%s_PLATFORM' % project, os.getenv(
'CMTCONFIG'))
95 return ','.
join([project, version] + platform.split(
'-'))
99 """JobRunner base class"""
102 """Constructor (takes any number of parameters as an argument)."""
110 self.
setParam(
'jobnr',-1,
'Number of job last configured',
True)
111 self.
setParam(
'inputfiles',[],
'List of all input files',
True)
112 self.
setParam(
'filesperjob',1,
'Number of input files per job')
113 self.
setParam(
'lbperjob',0,
'Number of luminosity blocks per job if doing LB bunching')
114 self.
setParam(
'maxjobs',0,
'Maximum number of jobs to run (0=no maximum)')
115 self.
setParam(
'jobname',
'job%(jobnr)03i',
'Job name template')
116 self.
setParam(
'jobdir',os.getcwd(),
'Directory template from where the jobs are started')
117 self.
setParam(
'rundir',
'.',
'Directory template where the jobs execute')
118 self.
setParam(
'configfile',
'%(jobdir)s/%(jobname)s.config.py',
'Job configuration file name template (use %(jobnr)i for current job number)')
119 self.
setParam(
'configmy',
'%(jobdir)s/configMy.py',
'Job configuration file for CA configuration')
120 self.
setParam(
'logfile',
'%(jobdir)s/%(jobname)s.log',
'Log file name template')
121 self.
setParam(
'scriptfile',
'%(jobdir)s/%(jobname)s.sh',
'Script file name template')
122 self.
setParam(
'outputfileprefix',
'%(jobdir)s/%(jobname)s-',
'Output file name prefix template')
123 self.
setParam(
'outputfilelist',[],
'List of desired output files (w/o outputfileprefix)')
124 self.
setParam(
'outputfilestring',
'',
'Generated comma-separated list of desired output files with full names',
True)
125 self.
setParam(
'filesforpoolcatalog',[],
'Files to insert into POOL file catalog')
126 self.
setParam(
'addinputtopoolcatalog',
True,
"Add job's input files to POOL file catalog?")
127 self.
setParam(
'subflag',
'%(jobdir)s/%(jobname)s.status.SUBMITTED',
'Status flag template for submitted jobs')
128 self.
setParam(
'runflag',
'%(jobdir)s/%(jobname)s.status.RUNNING',
'Status flag template for running jobs')
129 self.
setParam(
'exitflag',
'"%(jobdir)s/%(jobname)s.exit.$status"',
'Status flag template to record athena exit status')
130 self.
setParam(
'exitstatus',
'"%(jobdir)s/%(jobname)s.exitstatus.dat"',
'File with exit status value')
131 self.
setParam(
'postprocflag',
'%(jobdir)s/%(jobname)s.status.POSTPROCESSING',
'Status flag template for jobs requiring postprocessing')
132 self.
setParam(
'completedflag',
'%(jobdir)s/%(jobname)s.status.COMPLETED',
'Status flag template for completed jobs')
133 self.
setParam(
'doneflag',
'',
'Actual status flag used at end of job',
True)
134 self.
setParam(
'logmail',
'',
'E-mail address for sending status e-mails')
135 self.
setParam(
'release',
GetRelease(),
'Software release (may include flags such as opt,runtime)')
136 self.
setParam(
'setuprelease',
True,
'Setup ATLAS software release?')
137 self.
setParam(
'returnstatuscode',
False,
'Return athena status code from script?')
138 self.
setParam(
'joboptionpath',os.getcwd()+
'/joboptions.py',
'Master joboption file (w/full path if not in release)')
139 self.
setParam(
'taskpostprocsteps',
'',
'Task-level postprocessing steps (defined at task submission)')
140 self.
setParam(
'jobpostprocsteps',
'',
'Job-level postprocessing steps (updated by template)')
141 self.
setParam(
'cmdsetup',
'',
'Statement for setting up release',
True)
142 self.
setParam(
'cmddefinepoolcatalog',
'',
'Statement for defining POOL file catalog',
True)
143 self.
setParam(
'cmdcopyfiles',
'',
'Statement to copy files to local directory',
True)
144 self.
setParam(
'cmdjobpreprocessing',
'',
'Statement(s) to execute for preprocessing',
False)
145 self.
setParam(
'cmdjobpostprocessing',
'',
'Statement(s) to execute for postprocessing',
False)
146 self.
setParam(
'cmdexit',
'',
'Statement used to exit shell script (allows passing up athena exit code)',
True)
147 self.
setParam(
'maketmpdir',
'mkdir -p tmp',
'Statement to create tmp directory')
148 self.
setParam(
'script',scriptTemplate,
'Template of job submission script',
True)
151 for k
in params.keys():
157 def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
158 """Set the value of a job parameter. Job parameters may be templates that name
159 previously defined job parameters and will be evaluated when a given job
160 is configured. Therefore the order in which setParam is called for the different
161 parameters is relevant. insertAtFront can be set to True to force early
162 evaluation of a given parameter."""
164 if value
is not None:
166 if description
is not None:
167 p.description = description
168 if isSpecial
is not None:
169 p.isSpecial=isSpecial
177 def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
178 """Append to the value of a job parameter. If the parameter doesn't exist yet,
179 setParam is called to create it. If it does exist, only the value is updated
180 and the description and flag arguments are ignored."""
182 if value
is not None:
184 if isinstance(p.value,str)
and p.value:
185 p.value = p.value + endOfLine + value
187 p.value = p.value + value
189 self.
setParam(name,value,description,isSpecial,insertAtFront)
193 """Get value of a parameter by name."""
194 return self.
params[name].value
198 """Check if parameter requires special treatment."""
199 return self.
params[name].isSpecial
203 """Register a parameter holding the name of an input file to be
204 copied before a job starts."""
210 """A generic routine for checking job parameters. Check that all parameters
211 can be evaluated. May be overrridden by a subclass if desired."""
213 raise JobRunnerError (
'Inconsistent parameter definition')
219 if isinstance(value,str):
224 raise JobRunnerError (
'Unable to evaluate parameter: '+p+
' = '+value+
' (check parameter order)')
228 """Show current job parameters."""
231 if maxLineLength > 0
and len(s) > maxLineLength:
232 s = s[0:maxLineLength-3] +
'...'
236 def dumpParams(self,format='%(name)-20s = %(value)s
',maxLineLength=0):
237 """Dump all parameters into a string in a user-specified format."""
240 s = format % self.
params[p].__dict__
242 s = s[0:maxLineLength-3]+
'...'
248 """ Add a list of input files to be processed."""
253 """Add a list of files to be inserted to the POOL file catalog."""
258 """Configure parameters for a single job and write job configuration files. If inputLIst
259 is given, this is the list of input files used for the job."""
260 if jobnr
in self.
jobs:
261 raise JobRunnerError (
'Job number %s already configured' % jobnr)
264 jobConfig[
'jobnr'] = jobnr
269 jobConfig[
'inputfiles'] = inputList
271 inputfiles = self.
getParam(
'inputfiles')
272 iFirst = self.
getParam(
'filesperjob')*jobnr
273 if iFirst >= len(inputfiles):
274 raise JobRunnerError (
'Jobnr = %i too high for available number of files' % jobnr)
275 iLast = iFirst + self.
getParam(
'filesperjob')
276 if iLast > len(inputfiles):
277 iLast=len(inputfiles)
278 jobConfig[
'inputfiles'] = inputfiles[iFirst:iLast]
284 if isinstance(value,str):
285 jobConfig[p] = value % jobConfig
290 if jobConfig[
'setuprelease']:
292 jobConfig[
'cmdsetup'] = self.
getParam(
'cmdsetup') % jobConfig
294 if not jobConfig[
'cmdsetup']:
295 jobConfig[
'cmdsetup'] = os.getenv(
'CMDSETUP',
'source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
297 jobConfig[
'cmdsetup'] =
''
301 jobConfig[
'cmdcopyfiles'] = self.
getParam(
'cmdcopyfiles') % jobConfig
302 if not jobConfig[
'cmdcopyfiles']:
305 if not jobConfig[p]:
continue
306 filestobecopied =
' '.
join([filestobecopied,jobConfig[p]])
307 jobConfig[p] = os.path.basename(jobConfig[p])
309 jobConfig[
'cmdcopyfiles'] =
'cp -p '+filestobecopied+
' .'
313 jobConfig[
'cmddefinepoolcatalog'] = self.
getParam(
'cmddefinepoolcatalog') % jobConfig
314 if not jobConfig[
'cmddefinepoolcatalog']:
315 if jobConfig[
'addinputtopoolcatalog']:
316 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']+jobConfig[
'inputfiles']) % jobConfig
318 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']) % jobConfig
320 jobConfig[
'cmddefinepoolcatalog'] =
'pool_insertFileToCatalog '+argstring
323 if jobConfig[
'returnstatuscode']:
324 jobConfig[
'cmdexit'] = self.
getParam(
'cmdexit') % jobConfig
325 if not jobConfig[
'cmdexit']:
326 jobConfig[
'cmdexit'] =
'exit $status' % jobConfig
328 jobConfig[
'cmdexit'] =
''
331 jobConfig[
'outputfilestring'] =
','.
join([jobConfig[
'outputfileprefix']+f
for f
in jobConfig[
'outputfilelist']])
334 if jobConfig[
'taskpostprocsteps']:
335 jobConfig[
'doneflag'] = jobConfig[
'postprocflag']
337 jobConfig[
'doneflag'] = jobConfig[
'completedflag']
338 jobConfig[
'script'] = self.
getParam(
'script') % jobConfig
343 for f
in jobConfig[
'outputfilelist']:
344 if os.access(jobConfig[
'outputfileprefix']+f,os.F_OK):
345 raise JobRunnerError (
'Job output file %s exists already' % jobConfig[f])
346 for f
in (
'configfile',
'scriptfile',
'logfile'):
347 if os.access(jobConfig[f],os.F_OK):
348 raise JobRunnerError (
'Job configuration or log file %s exists already' % jobConfig[f])
351 os.makedirs(
'%(jobdir)s' % jobConfig)
354 config =
open(jobConfig[
'configfile'],
'w')
355 config.write(
'# Job configuration data for job %(jobname)s\n' % jobConfig)
356 config.write(
'# Generated by JobRunner.py\n\n')
357 if pprint.isreadable(jobConfig):
358 config.write(
'jobConfig = '+pprint.pformat(jobConfig))
360 config.write(
'jobConfig = '+
repr(jobConfig))
365 script =
open(jobConfig[
'scriptfile'],
'w')
366 script.write(jobConfig[
'script'])
368 os.chmod(jobConfig[
'scriptfile'],0o755)
371 self.
jobs[jobnr] = jobConfig
375 """Configure all jobs."""
377 raise JobRunnerError (
"No input files or illegal parameter 'filesperjob'")
378 lbperjob = self.
getParam(
'lbperjob')
382 raise JobRunnerError (
'Negative number of luminosity blocks per job not allowed')
383 inputfiles = self.
getParam(
'inputfiles')
386 lbpattern = re.compile(
r'lb(\d+)')
388 lbnrs = lbpattern.findall(f)
391 raise JobRunnerError (
'Unable to determine luminosity block number of file %s' % f)
394 raise JobRunnerError (
'Too many luminosity block numbers in filename %s' % f)
398 lbnrmax =
int(lbnrs[1])
402 while (lbnr <= lbnrmax) :
404 jobId =
int((lbnr-1)/lbperjob)
406 if jobId
not in jobInputDict:
407 jobInputDict[jobId] = [f]
408 jobLBDict[jobId] = [lbnr]
410 if f
not in jobInputDict[jobId] :
411 jobInputDict[jobId].
append(f)
412 if lbnr
not in jobLBDict[jobId] :
413 jobLBDict[jobId].
append(lbnr)
419 maxJobs = len(jobInputDict)
420 for i
in sorted(jobInputDict.keys())[:maxJobs]:
425 self.
setParam(
'lbList', jobLBDict[i])
433 print (
'\nConfiguring %i job(s) ...' % (njobs))
434 for i
in range(njobs):
439 """This method is to be overridden by dervied JobRunner classes. This default
440 implementation in the JobRunner base class runs jobs directly, printing the
441 output onto the termminal. If a specific implementation of submitJob directly
442 runs a job, it should return the exit status of the job. If it only submits the
443 job, the status should be returned as None."""
444 scriptfile = jobConfig[
'scriptfile']
445 logfile = jobConfig[
'logfile']
446 exitstatus = jobConfig[
'exitstatus']
450 status = subprocess.call(
'%s 2>&1 | tee %s ; exit `cat %s`'
451 % (scriptfile,logfile,exitstatus), shell=
True) >> 8
456 """Run a single configured job."""
457 if jobnr
not in self.
jobs:
458 raise JobRunnerError (
'Job number %s is not configured' % jobnr)
459 jobConfig = self.
jobs[jobnr]
460 subprocess.call(
'touch '+jobConfig[
'subflag'], shell=
True)
466 """Run all configured job(s)"""
468 raise JobRunnerError (
'No configured jobs')
469 njobs = len(self.
jobs)
472 print (
'\nStarting job %i (using %s) ...\n' % (j,self.__class__))
480 return len(self.
jobs)
484 """Get list of jobs submitted by this JobRunner that are still running"""
487 if os.path.exists(self.
jobs[j][
'subflag'])
or os.path.exists(self.
jobs[j][
'runflag']):
488 runningJobs.append(self.
jobs[j][
'jobname'])
493 """Wait until all jobs have completed"""
497 if not runningJobs:
break
499 print (time.asctime(),
' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
504 """Get a list of all output files."""
507 for f
in sorted(glob.glob(self.
jobs[j][
'outputfileprefix']+
'*')):
508 outputFiles.append(f)
512 def log(self,subject,body,**data):
513 """Send a log message to a set of users specified by parameter 'logmail'."""
515 msg =
'Generated by JobRunner on host '+socket.gethostname()+
' by '+pwd.getpwuid(os.getuid())[0]+
' at '+time.asctime()
516 msg +=
'\n\nCurrent working directory = '+os.getcwd()
517 if body: msg +=
'\n\n'+body
518 for k
in data.keys():
519 if isinstance(data[k],dict):
521 items = data[k].
keys()
524 msg +=
"\n %-20s = %s" % (i,data[k][i])
526 msg +=
"\n\n%-20s = %s" % (k,data[k])
527 subprocess.call(
"echo '%s' | mail -s '%s' '%s'" %
528 (msg,subject,self.
getParam(
'logmail')), shell=
True)
534 if __name__ ==
'__main__':
536 print (
'Testing JobRunner:')
538 t.addFiles([
'file1.dat',
'file2.dat',
'file3.dat'])