ATLAS Offline Software
Loading...
Searching...
No Matches
JobRunner.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4
5
6"""
7JobRunner is a set of classes aimed at simplifying the running of
8(athena) jobs on a set of input files. JobRunner provides each
9job with its set of configuration parameters (using a Python
10dictonary) and runs it over a sub-sample (or all) of the input files.
11Jobs can run interactively, on different batch systems, or on the Grid.
12
13Written by Juerg Beringer (LBNL) in 2008.
14"""
15__author__ = 'Juerg Beringer'
16__version__ = '$Id: JobRunner.py 747883 2016-05-18 06:58:10Z amorley $'
17
18import math
19import os
20import pwd
21import socket
22import time
23import pprint
24import glob
25import re
26import subprocess
27
28
29# Default template of job submission script
30scriptTemplate = """#!/bin/sh
31touch %(runflag)s
32rm %(subflag)s
33export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
34%(cmdjobpreprocessing)s
35%(cmdsetup)s
36mkdir -p %(rundir)s
37cd %(rundir)s
38echo "Preprocessing and copying files (`date`) ..."
39# Redirect TMPDIR to avoid problems with concurrent jobs
40%(maketmpdir)s
41export TMPDIR=`pwd`/tmp
42%(cmdcopyfiles)s
43echo ''
44echo "Creating POOL file catalog (`date`) ..."
45%(cmddefinepoolcatalog)s
46echo ''
47echo "Running athena (`date`) ..."
48export PYTHONPATH="%(jobdir)s:$PYTHONPATH"
49rm -f %(configmy)s
50ln -fs %(configfile)s %(configmy)s
51/usr/bin/time -p %(joboptionpath)s
52status=$?
53touch %(exitflag)s
54echo $status > %(exitstatus)s
55echo "Postprocessing (`date`) ..."
56%(cmdjobpostprocessing)s
57touch %(doneflag)s
58rm %(runflag)s
59%(cmdexit)s
60"""
61
62
63# Exception class
64class JobRunnerError(AttributeError):
65 """JobRunner exception class"""
66
67
68# Utility class to hold job parameter data
70 """JobRunner job parameter data"""
71
72 def __init__(self,name,value='',description='',isSpecial=False):
73 self.name = name
74 self.value = value
75 self.description = description
76 self.isSpecial = isSpecial # Special parameter, won't be propagated in normal way to jobs
77
78 def __str__(self):
79 if self.isSpecial:
80 return "%-20s = %-20s # SPECIAL - %s" % (self.name,self.value,self.description)
81 else:
82 if self.description:
83 return "%-20s = %-20s # %s" % (self.name,self.value,self.description)
84 else:
85 return "%-20s = %-20s" % (self.name,self.value)
86
88 version = os.getenv('AtlasVersion')
89 if not version: # nightly
90 branch = os.getenv('AtlasBuildBranch')
91 stamp = os.getenv('AtlasBuildStamp')
92 version = ','.join([branch, stamp])
93 project = os.getenv('AtlasProject')
94 platform = os.getenv('%s_PLATFORM' % project, os.getenv('CMTCONFIG'))
95 return ','.join([project, version] + platform.split('-'))
96
97# Main JobRunner class
99 """JobRunner base class"""
100
101 def __init__(self,**params):
102 """Constructor (takes any number of parameters as an argument)."""
103 self.params = { } # Dictionary of JobRunnerParameter, indexed by parameter name
104 self.paramOrder = [ ] # Ordered list of parameter names
105 self.paramToBeCopied = [ ] # List of parameters specifying files to be copied before job starts
106 self.jobs = { } # Dictionary of job configuratinos (params), indexed by jobnr
107 self.jobStatus = { } # Dictionary of job exit status, indexed by jobnr
108
109 # Job parameters (parameters flagged as special require specific code in configureJob)
110 self.setParam('jobnr',-1,'Number of job last configured',True)
111 self.setParam('inputfiles',[],'List of all input files',True)
112 self.setParam('filesperjob',1,'Number of input files per job')
113 self.setParam('lbperjob',0,'Number of luminosity blocks per job if doing LB bunching')
114 self.setParam('maxjobs',0,'Maximum number of jobs to run (0=no maximum)')
115 self.setParam('jobname','job%(jobnr)03i','Job name template')
116 self.setParam('jobdir',os.getcwd(),'Directory template from where the jobs are started')
117 self.setParam('rundir','.','Directory template where the jobs execute')
118 self.setParam('configfile','%(jobdir)s/%(jobname)s.config.py','Job configuration file name template (use %(jobnr)i for current job number)')
119 self.setParam('configmy','%(jobdir)s/configMy.py','Job configuration file for CA configuration')
120 self.setParam('logfile','%(jobdir)s/%(jobname)s.log','Log file name template')
121 self.setParam('scriptfile','%(jobdir)s/%(jobname)s.sh','Script file name template')
122 self.setParam('outputfileprefix','%(jobdir)s/%(jobname)s-','Output file name prefix template')
123 self.setParam('outputfilelist',[],'List of desired output files (w/o outputfileprefix)')
124 self.setParam('outputfilestring','','Generated comma-separated list of desired output files with full names',True)
125 self.setParam('filesforpoolcatalog',[],'Files to insert into POOL file catalog')
126 self.setParam('addinputtopoolcatalog',True,"Add job's input files to POOL file catalog?")
127 self.setParam('subflag','%(jobdir)s/%(jobname)s.status.SUBMITTED','Status flag template for submitted jobs')
128 self.setParam('runflag','%(jobdir)s/%(jobname)s.status.RUNNING','Status flag template for running jobs')
129 self.setParam('exitflag','"%(jobdir)s/%(jobname)s.exit.$status"','Status flag template to record athena exit status')
130 self.setParam('exitstatus','"%(jobdir)s/%(jobname)s.exitstatus.dat"','File with exit status value')
131 self.setParam('postprocflag','%(jobdir)s/%(jobname)s.status.POSTPROCESSING','Status flag template for jobs requiring postprocessing')
132 self.setParam('completedflag','%(jobdir)s/%(jobname)s.status.COMPLETED','Status flag template for completed jobs')
133 self.setParam('doneflag','','Actual status flag used at end of job',True)
134 self.setParam('logmail','','E-mail address for sending status e-mails')
135 self.setParam('release',GetRelease(),'Software release (may include flags such as opt,runtime)')
136 self.setParam('setuprelease',True,'Setup ATLAS software release?')
137 self.setParam('returnstatuscode',False,'Return athena status code from script?')
138 self.setParam('joboptionpath',os.getcwd()+'/joboptions.py','Master joboption file (w/full path if not in release)')
139 self.setParam('taskpostprocsteps','','Task-level postprocessing steps (defined at task submission)')
140 self.setParam('jobpostprocsteps','','Job-level postprocessing steps (updated by template)')
141 self.setParam('cmdsetup','','Statement for setting up release',True)
142 self.setParam('cmddefinepoolcatalog','','Statement for defining POOL file catalog',True)
143 self.setParam('cmdcopyfiles','','Statement to copy files to local directory',True)
144 self.setParam('cmdjobpreprocessing','','Statement(s) to execute for preprocessing',False)
145 self.setParam('cmdjobpostprocessing','','Statement(s) to execute for postprocessing',False)
146 self.setParam('cmdexit','','Statement used to exit shell script (allows passing up athena exit code)',True)
147 self.setParam('maketmpdir','mkdir -p tmp','Statement to create tmp directory')
148 self.setParam('script',scriptTemplate,'Template of job submission script',True)
149
150 # Set additional parameters given to constructor
151 for k in params.keys():
152 self.setParam(k,params[k])
153
154 self.checkParams()
155
156
157 def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
158 """Set the value of a job parameter. Job parameters may be templates that name
159 previously defined job parameters and will be evaluated when a given job
160 is configured. Therefore the order in which setParam is called for the different
161 parameters is relevant. insertAtFront can be set to True to force early
162 evaluation of a given parameter."""
163 p = self.params[name] = self.params.get(name,JobRunnerParameter(name))
164 if value is not None:
165 p.value = value
166 if description is not None:
167 p.description = description
168 if isSpecial is not None:
169 p.isSpecial=isSpecial
170 if name not in self.paramOrder:
171 if insertAtFront:
172 self.paramOrder.insert(0,name)
173 else:
174 self.paramOrder.append(name)
175
176
177 def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
178 """Append to the value of a job parameter. If the parameter doesn't exist yet,
179 setParam is called to create it. If it does exist, only the value is updated
180 and the description and flag arguments are ignored."""
181 if name in self.params:
182 if value is not None:
183 p = self.params[name]
184 if isinstance(p.value,str) and p.value:
185 p.value = p.value + endOfLine + value
186 else:
187 p.value = p.value + value
188 else:
189 self.setParam(name,value,description,isSpecial,insertAtFront)
190
191
192 def getParam(self,name):
193 """Get value of a parameter by name."""
194 return self.params[name].value
195
196
197 def isSpecialParam(self,name):
198 """Check if parameter requires special treatment."""
199 return self.params[name].isSpecial
200
201
202 def registerToBeCopied(self,paramName):
203 """Register a parameter holding the name of an input file to be
204 copied before a job starts."""
205 if paramName not in self.paramToBeCopied:
206 self.paramToBeCopied.append(paramName)
207
208
209 def checkParams(self):
210 """A generic routine for checking job parameters. Check that all parameters
211 can be evaluated. May be overrridden by a subclass if desired."""
212 if len(self.paramOrder)!=len(self.params):
213 raise JobRunnerError ('Inconsistent parameter definition')
214 try:
215 tmp = { }
216 for p in self.paramOrder:
217 value = '' # this is necessary to prevent an exception from picking up the previous value
218 value = self.getParam(p)
219 if isinstance(value,str):
220 tmp[p] = value % tmp
221 else:
222 tmp[p] = value
223 except Exception:
224 raise JobRunnerError ('Unable to evaluate parameter: '+p+' = '+value+' (check parameter order)')
225
226
227 def showParams(self,maxLineLength=80):
228 """Show current job parameters."""
229 for p in self.paramOrder:
230 s = str(self.params[p])
231 if maxLineLength > 0 and len(s) > maxLineLength:
232 s = s[0:maxLineLength-3] + '...'
233 print (s)
234
235
236 def dumpParams(self,format='%(name)-20s = %(value)s',maxLineLength=0):
237 """Dump all parameters into a string in a user-specified format."""
238 dump = ''
239 for p in self.paramOrder:
240 s = format % self.params[p].__dict__
241 if maxLineLength:
242 s = s[0:maxLineLength-3]+'...'
243 dump += s+'\n'
244 return dump
245
246
247 def addFiles(self,fileList):
248 """ Add a list of input files to be processed."""
249 self.getParam('inputfiles').extend(fileList)
250
251
252 def addFilesToPoolFileCatalog(self,fileList):
253 """Add a list of files to be inserted to the POOL file catalog."""
254 self.getParam('filesforpoolcatalog').extend(fileList)
255
256
257 def configureJob(self,jobnr,inputList=[]):
258 """Configure parameters for a single job and write job configuration files. If inputLIst
259 is given, this is the list of input files used for the job."""
260 if jobnr in self.jobs:
261 raise JobRunnerError ('Job number %s already configured' % jobnr)
262 jobConfig = { }
263
264 jobConfig['jobnr'] = jobnr
265 self.setParam('jobnr',jobnr)
266
267 # Check if any input files are available for this job
268 if inputList:
269 jobConfig['inputfiles'] = inputList
270 else:
271 inputfiles = self.getParam('inputfiles')
272 iFirst = self.getParam('filesperjob')*jobnr
273 if iFirst >= len(inputfiles):
274 raise JobRunnerError ('Jobnr = %i too high for available number of files' % jobnr)
275 iLast = iFirst + self.getParam('filesperjob')
276 if iLast > len(inputfiles):
277 iLast=len(inputfiles)
278 jobConfig['inputfiles'] = inputfiles[iFirst:iLast]
279
280 # Update regular parameters
281 for p in self.paramOrder:
282 if self.isSpecialParam(p): continue
283 value = self.getParam(p)
284 if isinstance(value,str):
285 jobConfig[p] = value % jobConfig
286 else:
287 jobConfig[p] = value
288
289 # Define statement to set up the release
290 if jobConfig['setuprelease']:
291 # Use explicit command if given
292 jobConfig['cmdsetup'] = self.getParam('cmdsetup') % jobConfig
293 # otherwise take from CMDSETUP env var if set or assume normal asetup
294 if not jobConfig['cmdsetup']:
295 jobConfig['cmdsetup'] = os.getenv('CMDSETUP','source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
296 else:
297 jobConfig['cmdsetup'] = ''
298
299 # Collect files to be copied locally, and replace corresponding jobConfig
300 # parameters by base file name (to point to the local copy)
301 jobConfig['cmdcopyfiles'] = self.getParam('cmdcopyfiles') % jobConfig
302 if not jobConfig['cmdcopyfiles']:
303 filestobecopied = ''
304 for p in self.paramToBeCopied:
305 if not jobConfig[p]: continue
306 filestobecopied = ' '.join([filestobecopied,jobConfig[p]])
307 jobConfig[p] = os.path.basename(jobConfig[p])
308 if filestobecopied:
309 jobConfig['cmdcopyfiles'] = 'cp -p '+filestobecopied+' .'
310
311 # Unless the user sets a specific value for parameter cmddefinepoolcatalog, determine
312 # what should be added and create appropriate catalog
313 jobConfig['cmddefinepoolcatalog'] = self.getParam('cmddefinepoolcatalog') % jobConfig
314 if not jobConfig['cmddefinepoolcatalog']:
315 if jobConfig['addinputtopoolcatalog']:
316 argstring = ' '.join(jobConfig['filesforpoolcatalog']+jobConfig['inputfiles']) % jobConfig
317 else:
318 argstring = ' '.join(jobConfig['filesforpoolcatalog']) % jobConfig
319 if argstring:
320 jobConfig['cmddefinepoolcatalog'] = 'pool_insertFileToCatalog '+argstring
321
322 # Define statement used to exit the shell script
323 if jobConfig['returnstatuscode']:
324 jobConfig['cmdexit'] = self.getParam('cmdexit') % jobConfig
325 if not jobConfig['cmdexit']:
326 jobConfig['cmdexit'] = 'exit $status' % jobConfig
327 else:
328 jobConfig['cmdexit'] = ''
329
330 # Generate string with comma-separated list of output files with full name
331 jobConfig['outputfilestring'] = ','.join([jobConfig['outputfileprefix']+f for f in jobConfig['outputfilelist']])
332
333 # Update remaining special parameters
334 if jobConfig['taskpostprocsteps']:
335 jobConfig['doneflag'] = jobConfig['postprocflag']
336 else:
337 jobConfig['doneflag'] = jobConfig['completedflag']
338 jobConfig['script'] = self.getParam('script') % jobConfig
339
340 # Check if job's output files exist already in order to prevent overwriting of data
341 # NOTE: cannot just check for existence of jobdir, since this might be set to the
342 # current directory
343 for f in jobConfig['outputfilelist']:
344 if os.access(jobConfig['outputfileprefix']+f,os.F_OK):
345 raise JobRunnerError ('Job output file %s exists already' % jobConfig[f])
346 for f in ('configfile', 'scriptfile', 'logfile'):
347 if os.access(jobConfig[f],os.F_OK):
348 raise JobRunnerError ('Job configuration or log file %s exists already' % jobConfig[f])
349
350 # Make sure start directory where script and config files will be written to exists
351 os.makedirs('%(jobdir)s' % jobConfig)
352
353 # Write job configuration file
354 config = open(jobConfig['configfile'],'w')
355 config.write('# Job configuration data for job %(jobname)s\n' % jobConfig)
356 config.write('# Generated by JobRunner.py\n\n')
357 if pprint.isreadable(jobConfig):
358 config.write('jobConfig = '+pprint.pformat(jobConfig))
359 else:
360 config.write('jobConfig = '+repr(jobConfig))
361 config.write('\n')
362 config.close()
363
364 # Write script file
365 script = open(jobConfig['scriptfile'],'w')
366 script.write(jobConfig['script'])
367 script.close()
368 os.chmod(jobConfig['scriptfile'],0o755)
369
370 # Record job configuration
371 self.jobs[jobnr] = jobConfig
372
373
374 def configure(self):
375 """Configure all jobs."""
376 if self.getParam('filesperjob')==0 or not self.getParam('inputfiles'):
377 raise JobRunnerError ("No input files or illegal parameter 'filesperjob'")
378 lbperjob = self.getParam('lbperjob')
379 if lbperjob:
380 # Bunched job submission with all files from LB range in single job
381 if lbperjob<=0:
382 raise JobRunnerError ('Negative number of luminosity blocks per job not allowed')
383 inputfiles = self.getParam('inputfiles')
384 jobInputDict = {}
385 jobLBDict = {}
386 lbpattern = re.compile(r'lb(\d+)')
387 for f in inputfiles:
388 lbnrs = lbpattern.findall(f)
389
390 if(len(lbnrs) < 1) :
391 raise JobRunnerError ('Unable to determine luminosity block number of file %s' % f)
392
393 if(len(lbnrs) > 2) :
394 raise JobRunnerError ('Too many luminosity block numbers in filename %s' % f)
395
396 lbnr = int(lbnrs[0])
397 if(len(lbnrs) > 1) :
398 lbnrmax = int(lbnrs[1])
399 else :
400 lbnrmax = lbnr
401
402 while (lbnr <= lbnrmax) :
403
404 jobId = int((lbnr-1)/lbperjob)
405 #print ('LB = %4i jobid = %i' % (lbnr,jobId))
406 if jobId not in jobInputDict:
407 jobInputDict[jobId] = [f]
408 jobLBDict[jobId] = [lbnr]
409 else:
410 if f not in jobInputDict[jobId] :
411 jobInputDict[jobId].append(f)
412 if lbnr not in jobLBDict[jobId] :
413 jobLBDict[jobId].append(lbnr)
414
415 lbnr = lbnr+1
416
417 maxJobs = self.getParam('maxjobs')
418 if not maxJobs:
419 maxJobs = len(jobInputDict)
420 for i in sorted(jobInputDict.keys())[:maxJobs]:
421 if i<0:
422 jobnr = 0
423 else:
424 jobnr = i*lbperjob+1 # use first LB number as job number
425 self.setParam('lbList', jobLBDict[i])
426 self.configureJob(jobnr,jobInputDict[i])
427
428 else:
429 # Normal (unbunched job submission)
430 njobs = int(math.ceil(float(len(self.getParam('inputfiles')))/self.getParam('filesperjob')))
431 if self.getParam('maxjobs'):
432 njobs = min(self.getParam('maxjobs'),njobs)
433 print ('\nConfiguring %i job(s) ...' % (njobs))
434 for i in range(njobs):
435 self.configureJob(i)
436
437
438 def submitJob(self,jobConfig):
439 """This method is to be overridden by dervied JobRunner classes. This default
440 implementation in the JobRunner base class runs jobs directly, printing the
441 output onto the termminal. If a specific implementation of submitJob directly
442 runs a job, it should return the exit status of the job. If it only submits the
443 job, the status should be returned as None."""
444 scriptfile = jobConfig['scriptfile']
445 logfile = jobConfig['logfile']
446 exitstatus = jobConfig['exitstatus']
447 # Write only to standard output, don't produce log file
448 #status = subprocess.call(scriptfile, shell=True) >> 8 # Convert to standard Unix exit code
449 # Write to both stdout and log file, preserve exit status code
450 status = subprocess.call('%s 2>&1 | tee %s ; exit `cat %s`'
451 % (scriptfile,logfile,exitstatus), shell=True) >> 8 # Convert to standard Unix exit code
452 return status
453
454
455 def runJob(self,jobnr):
456 """Run a single configured job."""
457 if jobnr not in self.jobs:
458 raise JobRunnerError ('Job number %s is not configured' % jobnr)
459 jobConfig = self.jobs[jobnr]
460 subprocess.call('touch '+jobConfig['subflag'], shell=True)
461 status = self.submitJob(jobConfig)
462 self.jobStatus[jobnr] = status
463
464
465 def run(self,jobnr=None):
466 """Run all configured job(s)"""
467 if not self.jobs:
468 raise JobRunnerError ('No configured jobs')
469 njobs = len(self.jobs)
470 self.log('JobRunner: starting '+str(njobs)+' jobs',self.dumpParams())
471 for j in sorted(self.jobs.keys()):
472 print ('\nStarting job %i (using %s) ...\n' % (j,self.__class__))
473 self.runJob(j)
474
475
476 def getNJobs(self):
477 if not self.jobs:
478 return 0
479 else:
480 return len(self.jobs)
481
482
483 def getRunningJobs(self):
484 """Get list of jobs submitted by this JobRunner that are still running"""
485 runningJobs = [ ]
486 for j in self.jobs:
487 if os.path.exists(self.jobs[j]['subflag']) or os.path.exists(self.jobs[j]['runflag']):
488 runningJobs.append(self.jobs[j]['jobname'])
489 return runningJobs
490
491
492 def wait(self):
493 """Wait until all jobs have completed"""
494 while 1:
495 time.sleep(30)
496 runningJobs = self.getRunningJobs()
497 if not runningJobs: break
498 print()
499 print (time.asctime(),' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
500 self.log('JobRunner: finished',self.dumpParams(),outputFiles=self.getOutputFiles())
501
502
503 def getOutputFiles(self):
504 """Get a list of all output files."""
505 outputFiles = [ ]
506 for j in self.jobs:
507 for f in sorted(glob.glob(self.jobs[j]['outputfileprefix']+'*')):
508 outputFiles.append(f)
509 return outputFiles
510
511
512 def log(self,subject,body,**data):
513 """Send a log message to a set of users specified by parameter 'logmail'."""
514 if self.getParam('logmail'):
515 msg = 'Generated by JobRunner on host '+socket.gethostname()+' by '+pwd.getpwuid(os.getuid())[0]+' at '+time.asctime()
516 msg += '\n\nCurrent working directory = '+os.getcwd()
517 if body: msg += '\n\n'+body
518 for k in data.keys():
519 if isinstance(data[k],dict):
520 msg += "\n\n%s:" % k
521 items = data[k].keys()
522 items.sort()
523 for i in items:
524 msg += "\n %-20s = %s" % (i,data[k][i])
525 else:
526 msg += "\n\n%-20s = %s" % (k,data[k])
527 subprocess.call("echo '%s' | mail -s '%s' '%s'" %
528 (msg,subject,self.getParam('logmail')), shell=True)
529
530
531#
532# Test code
533#
534if __name__ == '__main__':
535 print (__doc__)
536 print ('Testing JobRunner:')
538 t.addFiles(['file1.dat','file2.dat','file3.dat'])
539 t.showParams()
540 #t.configure()
541 #t.run()
if(febId1==febId2)
void print(char *figname, TCanvas *c1)
#define min(a, b)
Definition cfImp.cxx:40
__init__(self, name, value='', description='', isSpecial=False)
Definition JobRunner.py:72
addFiles(self, fileList)
Definition JobRunner.py:247
setParam(self, name, value=None, description=None, isSpecial=None, insertAtFront=False)
Definition JobRunner.py:157
__init__(self, **params)
Definition JobRunner.py:101
showParams(self, maxLineLength=80)
Definition JobRunner.py:227
addFilesToPoolFileCatalog(self, fileList)
Definition JobRunner.py:252
log(self, subject, body, **data)
Definition JobRunner.py:512
appendParam(self, name, value=None, description=None, isSpecial=None, insertAtFront=False, endOfLine='\n')
Definition JobRunner.py:177
submitJob(self, jobConfig)
Definition JobRunner.py:438
configureJob(self, jobnr, inputList=[])
Definition JobRunner.py:257
dumpParams(self, format='%(name) -20s=%(value) s', maxLineLength=0)
Definition JobRunner.py:236
registerToBeCopied(self, paramName)
Definition JobRunner.py:202
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
Definition run.py:1