5 from __future__
import print_function
8 JobRunner is a set of classes aimed at simplifying the running of
9 (athena) jobs on a set of input files. JobRunner provides each
10 job with its set of configuration parameters (using a Python
11 dictonary) and runs it over a sub-sample (or all) of the input files.
12 Jobs can run interactively, on different batch systems, or on the Grid.
14 Written by Juerg Beringer (LBNL) in 2008.
16 __author__ =
'Juerg Beringer'
17 __version__ =
'$Id: JobRunner.py 747883 2016-05-18 06:58:10Z amorley $'
31 scriptTemplate =
"""#!/bin/sh
34 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
35 %(cmdjobpreprocessing)s
39 echo "Preprocessing and copying files (`date`) ..."
40 # Redirect TMPDIR to avoid problems with concurrent jobs
42 export TMPDIR=`pwd`/tmp
45 echo "Creating POOL file catalog (`date`) ..."
46 %(cmddefinepoolcatalog)s
48 echo "Running athena (`date`) ..."
49 export PYTHONPATH="%(jobdir)s:$PYTHONPATH"
51 ln -fs %(configfile)s %(configmy)s
52 /usr/bin/time -p %(joboptionpath)s
55 echo $status > %(exitstatus)s
56 echo "Postprocessing (`date`) ..."
57 %(cmdjobpostprocessing)s
66 """JobRunner exception class"""
71 """JobRunner job parameter data"""
73 def __init__(self,name,value='',description='',isSpecial=False):
86 return "%-20s = %-20s" % (self.
name,self.
value)
89 version = os.getenv(
'AtlasVersion')
91 branch = os.getenv(
'AtlasBuildBranch')
92 stamp = os.getenv(
'AtlasBuildStamp')
93 version =
','.
join([branch, stamp])
94 project = os.getenv(
'AtlasProject')
95 platform = os.getenv(
'%s_PLATFORM' % project, os.getenv(
'CMTCONFIG'))
96 return ','.
join([project, version] + platform.split(
'-'))
100 """JobRunner base class"""
103 """Constructor (takes any number of parameters as an argument)."""
111 self.
setParam(
'jobnr',-1,
'Number of job last configured',
True)
112 self.
setParam(
'inputfiles',[],
'List of all input files',
True)
113 self.
setParam(
'filesperjob',1,
'Number of input files per job')
114 self.
setParam(
'lbperjob',0,
'Number of luminosity blocks per job if doing LB bunching')
115 self.
setParam(
'maxjobs',0,
'Maximum number of jobs to run (0=no maximum)')
116 self.
setParam(
'jobname',
'job%(jobnr)03i',
'Job name template')
117 self.
setParam(
'jobdir',os.getcwd(),
'Directory template from where the jobs are started')
118 self.
setParam(
'rundir',
'.',
'Directory template where the jobs execute')
119 self.
setParam(
'configfile',
'%(jobdir)s/%(jobname)s.config.py',
'Job configuration file name template (use %(jobnr)i for current job number)')
120 self.
setParam(
'configmy',
'%(jobdir)s/configMy.py',
'Job configuration file for CA configuration')
121 self.
setParam(
'logfile',
'%(jobdir)s/%(jobname)s.log',
'Log file name template')
122 self.
setParam(
'scriptfile',
'%(jobdir)s/%(jobname)s.sh',
'Script file name template')
123 self.
setParam(
'outputfileprefix',
'%(jobdir)s/%(jobname)s-',
'Output file name prefix template')
124 self.
setParam(
'outputfilelist',[],
'List of desired output files (w/o outputfileprefix)')
125 self.
setParam(
'outputfilestring',
'',
'Generated comma-separated list of desired output files with full names',
True)
126 self.
setParam(
'filesforpoolcatalog',[],
'Files to insert into POOL file catalog')
127 self.
setParam(
'addinputtopoolcatalog',
True,
"Add job's input files to POOL file catalog?")
128 self.
setParam(
'subflag',
'%(jobdir)s/%(jobname)s.status.SUBMITTED',
'Status flag template for submitted jobs')
129 self.
setParam(
'runflag',
'%(jobdir)s/%(jobname)s.status.RUNNING',
'Status flag template for running jobs')
130 self.
setParam(
'exitflag',
'"%(jobdir)s/%(jobname)s.exit.$status"',
'Status flag template to record athena exit status')
131 self.
setParam(
'exitstatus',
'"%(jobdir)s/%(jobname)s.exitstatus.dat"',
'File with exit status value')
132 self.
setParam(
'postprocflag',
'%(jobdir)s/%(jobname)s.status.POSTPROCESSING',
'Status flag template for jobs requiring postprocessing')
133 self.
setParam(
'completedflag',
'%(jobdir)s/%(jobname)s.status.COMPLETED',
'Status flag template for completed jobs')
134 self.
setParam(
'doneflag',
'',
'Actual status flag used at end of job',
True)
135 self.
setParam(
'logmail',
'',
'E-mail address for sending status e-mails')
136 self.
setParam(
'release',
GetRelease(),
'Software release (may include flags such as opt,runtime)')
137 self.
setParam(
'setuprelease',
True,
'Setup ATLAS software release?')
138 self.
setParam(
'returnstatuscode',
False,
'Return athena status code from script?')
139 self.
setParam(
'joboptionpath',os.getcwd()+
'/joboptions.py',
'Master joboption file (w/full path if not in release)')
140 self.
setParam(
'taskpostprocsteps',
'',
'Task-level postprocessing steps (defined at task submission)')
141 self.
setParam(
'jobpostprocsteps',
'',
'Job-level postprocessing steps (updated by template)')
142 self.
setParam(
'cmdsetup',
'',
'Statement for setting up release',
True)
143 self.
setParam(
'cmddefinepoolcatalog',
'',
'Statement for defining POOL file catalog',
True)
144 self.
setParam(
'cmdcopyfiles',
'',
'Statement to copy files to local directory',
True)
145 self.
setParam(
'cmdjobpreprocessing',
'',
'Statement(s) to execute for preprocessing',
False)
146 self.
setParam(
'cmdjobpostprocessing',
'',
'Statement(s) to execute for postprocessing',
False)
147 self.
setParam(
'cmdexit',
'',
'Statement used to exit shell script (allows passing up athena exit code)',
True)
148 self.
setParam(
'maketmpdir',
'mkdir -p tmp',
'Statement to create tmp directory')
149 self.
setParam(
'script',scriptTemplate,
'Template of job submission script',
True)
152 for k
in params.keys():
158 def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
159 """Set the value of a job parameter. Job parameters may be templates that name
160 previously defined job parameters and will be evaluated when a given job
161 is configured. Therefore the order in which setParam is called for the different
162 parameters is relevant. insertAtFront can be set to True to force early
163 evaluation of a given parameter."""
165 if value
is not None:
167 if description
is not None:
168 p.description = description
169 if isSpecial
is not None:
170 p.isSpecial=isSpecial
178 def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
179 """Append to the value of a job parameter. If the parameter doesn't exist yet,
180 setParam is called to create it. If it does exist, only the value is updated
181 and the description and flag arguments are ignored."""
183 if value
is not None:
185 if isinstance(p.value,str)
and p.value:
186 p.value = p.value + endOfLine + value
188 p.value = p.value + value
190 self.
setParam(name,value,description,isSpecial,insertAtFront)
194 """Get value of a parameter by name."""
195 return self.
params[name].value
199 """Check if parameter requires special treatment."""
200 return self.
params[name].isSpecial
204 """Register a parameter holding the name of an input file to be
205 copied before a job starts."""
211 """A generic routine for checking job parameters. Check that all parameters
212 can be evaluated. May be overrridden by a subclass if desired."""
214 raise JobRunnerError (
'Inconsistent parameter definition')
220 if isinstance(value,str):
225 raise JobRunnerError (
'Unable to evaluate parameter: '+p+
' = '+value+
' (check parameter order)')
229 """Show current job parameters."""
232 if maxLineLength > 0
and len(s) > maxLineLength:
233 s = s[0:maxLineLength-3] +
'...'
237 def dumpParams(self,format='%(name)-20s = %(value)s
',maxLineLength=0):
238 """Dump all parameters into a string in a user-specified format."""
241 s = format % self.
params[p].__dict__
243 s = s[0:maxLineLength-3]+
'...'
249 """ Add a list of input files to be processed."""
254 """Add a list of files to be inserted to the POOL file catalog."""
259 """Configure parameters for a single job and write job configuration files. If inputLIst
260 is given, this is the list of input files used for the job."""
261 if jobnr
in self.
jobs:
262 raise JobRunnerError (
'Job number %s already configured' % jobnr)
265 jobConfig[
'jobnr'] = jobnr
270 jobConfig[
'inputfiles'] = inputList
272 inputfiles = self.
getParam(
'inputfiles')
273 iFirst = self.
getParam(
'filesperjob')*jobnr
274 if iFirst >= len(inputfiles):
275 raise JobRunnerError (
'Jobnr = %i too high for available number of files' % jobnr)
276 iLast = iFirst + self.
getParam(
'filesperjob')
277 if iLast > len(inputfiles):
278 iLast=len(inputfiles)
279 jobConfig[
'inputfiles'] = inputfiles[iFirst:iLast]
285 if isinstance(value,str):
286 jobConfig[p] = value % jobConfig
291 if jobConfig[
'setuprelease']:
293 jobConfig[
'cmdsetup'] = self.
getParam(
'cmdsetup') % jobConfig
295 if not jobConfig[
'cmdsetup']:
296 jobConfig[
'cmdsetup'] = os.getenv(
'CMDSETUP',
'source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
298 jobConfig[
'cmdsetup'] =
''
302 jobConfig[
'cmdcopyfiles'] = self.
getParam(
'cmdcopyfiles') % jobConfig
303 if not jobConfig[
'cmdcopyfiles']:
306 if not jobConfig[p]:
continue
307 filestobecopied =
' '.
join([filestobecopied,jobConfig[p]])
308 jobConfig[p] = os.path.basename(jobConfig[p])
310 jobConfig[
'cmdcopyfiles'] =
'cp -p '+filestobecopied+
' .'
314 jobConfig[
'cmddefinepoolcatalog'] = self.
getParam(
'cmddefinepoolcatalog') % jobConfig
315 if not jobConfig[
'cmddefinepoolcatalog']:
316 if jobConfig[
'addinputtopoolcatalog']:
317 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']+jobConfig[
'inputfiles']) % jobConfig
319 argstring =
' '.
join(jobConfig[
'filesforpoolcatalog']) % jobConfig
321 jobConfig[
'cmddefinepoolcatalog'] =
'pool_insertFileToCatalog '+argstring
324 if jobConfig[
'returnstatuscode']:
325 jobConfig[
'cmdexit'] = self.
getParam(
'cmdexit') % jobConfig
326 if not jobConfig[
'cmdexit']:
327 jobConfig[
'cmdexit'] =
'exit $status' % jobConfig
329 jobConfig[
'cmdexit'] =
''
332 jobConfig[
'outputfilestring'] =
','.
join([jobConfig[
'outputfileprefix']+f
for f
in jobConfig[
'outputfilelist']])
335 if jobConfig[
'taskpostprocsteps']:
336 jobConfig[
'doneflag'] = jobConfig[
'postprocflag']
338 jobConfig[
'doneflag'] = jobConfig[
'completedflag']
339 jobConfig[
'script'] = self.
getParam(
'script') % jobConfig
344 for f
in jobConfig[
'outputfilelist']:
345 if os.access(jobConfig[
'outputfileprefix']+f,os.F_OK):
346 raise JobRunnerError (
'Job output file %s exists already' % jobConfig[f])
347 for f
in (
'configfile',
'scriptfile',
'logfile'):
348 if os.access(jobConfig[f],os.F_OK):
349 raise JobRunnerError (
'Job configuration or log file %s exists already' % jobConfig[f])
352 os.makedirs(
'%(jobdir)s' % jobConfig)
355 config =
open(jobConfig[
'configfile'],
'w')
356 config.write(
'# Job configuration data for job %(jobname)s\n' % jobConfig)
357 config.write(
'# Generated by JobRunner.py\n\n')
358 if pprint.isreadable(jobConfig):
359 config.write(
'jobConfig = '+pprint.pformat(jobConfig))
361 config.write(
'jobConfig = '+
repr(jobConfig))
366 script =
open(jobConfig[
'scriptfile'],
'w')
367 script.write(jobConfig[
'script'])
369 os.chmod(jobConfig[
'scriptfile'],0o755)
372 self.
jobs[jobnr] = jobConfig
376 """Configure all jobs."""
378 raise JobRunnerError (
"No input files or illegal parameter 'filesperjob'")
379 lbperjob = self.
getParam(
'lbperjob')
383 raise JobRunnerError (
'Negative number of luminosity blocks per job not allowed')
384 inputfiles = self.
getParam(
'inputfiles')
387 lbpattern = re.compile(
r'lb(\d+)')
389 lbnrs = lbpattern.findall(f)
392 raise JobRunnerError (
'Unable to determine luminosity block number of file %s' % f)
395 raise JobRunnerError (
'Too many luminosity block numbers in filename %s' % f)
399 lbnrmax =
int(lbnrs[1])
403 while (lbnr <= lbnrmax) :
405 jobId =
int((lbnr-1)/lbperjob)
407 if jobId
not in jobInputDict:
408 jobInputDict[jobId] = [f]
409 jobLBDict[jobId] = [lbnr]
411 if f
not in jobInputDict[jobId] :
412 jobInputDict[jobId].
append(f)
413 if lbnr
not in jobLBDict[jobId] :
414 jobLBDict[jobId].
append(lbnr)
420 maxJobs = len(jobInputDict)
421 for i
in sorted(jobInputDict.keys())[:maxJobs]:
426 self.
setParam(
'lbList', jobLBDict[i])
434 print (
'\nConfiguring %i job(s) ...' % (njobs))
435 for i
in range(njobs):
440 """This method is to be overridden by dervied JobRunner classes. This default
441 implementation in the JobRunner base class runs jobs directly, printing the
442 output onto the termminal. If a specific implementation of submitJob directly
443 runs a job, it should return the exit status of the job. If it only submits the
444 job, the status should be returned as None."""
445 scriptfile = jobConfig[
'scriptfile']
446 logfile = jobConfig[
'logfile']
447 exitstatus = jobConfig[
'exitstatus']
451 status = subprocess.call(
'%s 2>&1 | tee %s ; exit `cat %s`'
452 % (scriptfile,logfile,exitstatus), shell=
True) >> 8
457 """Run a single configured job."""
458 if jobnr
not in self.
jobs:
459 raise JobRunnerError (
'Job number %s is not configured' % jobnr)
460 jobConfig = self.
jobs[jobnr]
461 subprocess.call(
'touch '+jobConfig[
'subflag'], shell=
True)
467 """Run all configured job(s)"""
469 raise JobRunnerError (
'No configured jobs')
470 njobs = len(self.
jobs)
473 print (
'\nStarting job %i (using %s) ...\n' % (j,self.__class__))
481 return len(self.
jobs)
485 """Get list of jobs submitted by this JobRunner that are still running"""
488 if os.path.exists(self.
jobs[j][
'subflag'])
or os.path.exists(self.
jobs[j][
'runflag']):
489 runningJobs.append(self.
jobs[j][
'jobname'])
494 """Wait until all jobs have completed"""
498 if not runningJobs:
break
500 print (time.asctime(),
' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
505 """Get a list of all output files."""
508 for f
in sorted(glob.glob(self.
jobs[j][
'outputfileprefix']+
'*')):
509 outputFiles.append(f)
513 def log(self,subject,body,**data):
514 """Send a log message to a set of users specified by parameter 'logmail'."""
516 msg =
'Generated by JobRunner on host '+socket.gethostname()+
' by '+pwd.getpwuid(os.getuid())[0]+
' at '+time.asctime()
517 msg +=
'\n\nCurrent working directory = '+os.getcwd()
518 if body: msg +=
'\n\n'+body
519 for k
in data.keys():
520 if isinstance(data[k],dict):
522 items = data[k].
keys()
525 msg +=
"\n %-20s = %s" % (i,data[k][i])
527 msg +=
"\n\n%-20s = %s" % (k,data[k])
528 subprocess.call(
"echo '%s' | mail -s '%s' '%s'" %
529 (msg,subject,self.
getParam(
'logmail')), shell=
True)
535 if __name__ ==
'__main__':
537 print (
'Testing JobRunner:')
539 t.addFiles([
'file1.dat',
'file2.dat',
'file3.dat'])