|  | 
| def | __init__ (self, **params) | 
|  | 
| def | submitJob (self, jobConfig) | 
|  | 
| def | setParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False) | 
|  | 
| def | appendParam (self, name, value=None, description=None, isSpecial=None, insertAtFront=False, endOfLine='\n') | 
|  | 
| def | getParam (self, name) | 
|  | 
| def | isSpecialParam (self, name) | 
|  | 
| def | registerToBeCopied (self, paramName) | 
|  | 
| def | checkParams (self) | 
|  | 
| def | showParams (self, maxLineLength=80) | 
|  | 
| def | dumpParams (self, format='%(name) -20s=%(value) s', maxLineLength=0) | 
|  | 
| def | addFiles (self, fileList) | 
|  | 
| def | addFilesToPoolFileCatalog (self, fileList) | 
|  | 
| def | configureJob (self, jobnr, inputList=[]) | 
|  | 
| def | configure (self) | 
|  | 
| def | runJob (self, jobnr) | 
|  | 
| def | run (self, jobnr=None) | 
|  | 
| def | getNJobs (self) | 
|  | 
| def | getRunningJobs (self) | 
|  | 
| def | wait (self) | 
|  | 
| def | getOutputFiles (self) | 
|  | 
| def | log (self, subject, body, **data) | 
|  | 
PDSFJobRunner - run jobs using the SGE batch system on PDSF
 
Definition at line 35 of file PDSFJobRunner.py.
◆ __init__()
      
        
          | def python.PDSFJobRunner.PDSFJobRunner.__init__ | ( |  | self, | 
        
          |  |  | ** | params | 
        
          |  | ) |  |  | 
      
 
Constructor (takes any number of parameters as an argument).
 
Reimplemented from python.JobRunner.JobRunner.
Definition at line 38 of file PDSFJobRunner.py.
   39         """Constructor (takes any number of parameters as an argument).""" 
   40         JobRunner.__init__(self)
 
   43         self.setParam(
'batchqueue',
'all.64bit.q',
'Batch queue')
 
   44         script = self.getParam(
'script')
 
   45         self.setParam(
'script',preScript+script)
 
   46         for k 
in params.keys():
 
   47             self.setParam(k,params[k])
 
   49         self.setParam(
'rundir',
'%(jobdir)s')
 
   50         self.setParam(
'maketmpdir',
'')
 
 
 
 
◆ addFiles()
  
  | 
        
          | def python.JobRunner.JobRunner.addFiles | ( |  | self, |  
          |  |  |  | fileList |  
          |  | ) |  |  |  | inherited | 
 
Add a list of input files to be processed.
 
Definition at line 247 of file JobRunner.py.
  247     def addFiles(self,fileList):
 
  248         """ Add a list of input files to be processed.""" 
  249         self.getParam(
'inputfiles').
extend(fileList)
 
 
 
◆ addFilesToPoolFileCatalog()
  
  | 
        
          | def python.JobRunner.JobRunner.addFilesToPoolFileCatalog | ( |  | self, |  
          |  |  |  | fileList |  
          |  | ) |  |  |  | inherited | 
 
Add a list of files to be inserted to the POOL file catalog.
 
Definition at line 252 of file JobRunner.py.
  252     def addFilesToPoolFileCatalog(self,fileList):
 
  253         """Add a list of files to be inserted to the POOL file catalog.""" 
  254         self.getParam(
'filesforpoolcatalog').
extend(fileList)
 
 
 
◆ appendParam()
  
  | 
        
          | def python.JobRunner.JobRunner.appendParam | ( |  | self, |  
          |  |  |  | name, |  
          |  |  |  | value = None, |  
          |  |  |  | description = None, |  
          |  |  |  | isSpecial = None, |  
          |  |  |  | insertAtFront = False, |  
          |  |  |  | endOfLine = '\n' |  
          |  | ) |  |  |  | inherited | 
 
Append to the value of a job parameter. If the parameter doesn't exist yet,
   setParam is called to create it. If it does exist, only the value is updated
   and the description and flag arguments are ignored.
 
Definition at line 177 of file JobRunner.py.
  177     def appendParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False,endOfLine='\n'):
 
  178         """Append to the value of a job parameter. If the parameter doesn't exist yet, 
  179            setParam is called to create it. If it does exist, only the value is updated 
  180            and the description and flag arguments are ignored.""" 
  181         if name 
in self.params:
 
  182             if value 
is not None:
 
  183                 p = self.params[name]
 
  184                 if isinstance(p.value,str) 
and p.value:
 
  185                     p.value = p.value + endOfLine + value
 
  187                     p.value = p.value + value
 
  189             self.setParam(name,value,description,isSpecial,insertAtFront)
 
 
 
◆ checkParams()
  
  | 
        
          | def python.JobRunner.JobRunner.checkParams | ( |  | self | ) |  |  | inherited | 
 
A generic routine for checking job parameters. Check that all parameters
   can be evaluated. May be overrridden by a subclass if desired.
 
Definition at line 209 of file JobRunner.py.
  209     def checkParams(self):
 
  210         """A generic routine for checking job parameters. Check that all parameters 
  211            can be evaluated. May be overrridden by a subclass if desired.""" 
  212         if len(self.paramOrder)!=len(self.params):
 
  213             raise JobRunnerError (
'Inconsistent parameter definition')
 
  216             for p 
in self.paramOrder:
 
  218                 value = self.getParam(p)
 
  219                 if isinstance(value,str):
 
  224             raise JobRunnerError (
'Unable to evaluate parameter: '+p+
' = '+value+
' (check parameter order)')
 
 
 
◆ configure()
  
  | 
        
          | def python.JobRunner.JobRunner.configure | ( |  | self | ) |  |  | inherited | 
 
Configure all jobs.
 
Definition at line 374 of file JobRunner.py.
  375         """Configure all jobs.""" 
  376         if self.getParam(
'filesperjob')==0 
or not self.getParam(
'inputfiles'):
 
  377             raise JobRunnerError (
"No input files or illegal parameter 'filesperjob'")
 
  378         lbperjob = self.getParam(
'lbperjob')
 
  382                 raise JobRunnerError (
'Negative number of luminosity blocks per job not allowed')
 
  383             inputfiles = self.getParam(
'inputfiles')
 
  386             lbpattern = re.compile(
r'lb(\d+)')
 
  388                 lbnrs = lbpattern.findall(f)
 
  391                     raise JobRunnerError (
'Unable to determine luminosity block number of file %s' % f)
 
  394                     raise JobRunnerError (
'Too many luminosity block numbers in filename %s' % f)
 
  398                     lbnrmax = 
int(lbnrs[1])
 
  402                 while (lbnr <=  lbnrmax) :                    
 
  404                     jobId = 
int((lbnr-1)/lbperjob)
 
  406                     if jobId 
not in jobInputDict:
 
  407                         jobInputDict[jobId] = [f]
 
  408                         jobLBDict[jobId] = [lbnr]
 
  410                         if f 
not in jobInputDict[jobId] :
 
  411                             jobInputDict[jobId].
append(f)
 
  412                         if lbnr 
not in jobLBDict[jobId] :
 
  413                             jobLBDict[jobId].
append(lbnr)
 
  417             maxJobs = self.getParam(
'maxjobs')
 
  419                 maxJobs = len(jobInputDict)
 
  420             for i 
in  sorted(jobInputDict.keys())[:maxJobs]:
 
  425                 self.setParam(
'lbList', jobLBDict[i])
 
  426                 self.configureJob(jobnr,jobInputDict[i])
 
  430             njobs = 
int(math.ceil(
float(len(self.getParam(
'inputfiles')))/self.getParam(
'filesperjob')))
 
  431             if self.getParam(
'maxjobs'):
 
  432                 njobs = 
min(self.getParam(
'maxjobs'),njobs)
 
  433             print (
'\nConfiguring %i job(s) ...' % (njobs))
 
  434             for i 
in range(njobs):
 
 
 
 
◆ configureJob()
  
  | 
        
          | def python.JobRunner.JobRunner.configureJob | ( |  | self, |  
          |  |  |  | jobnr, |  
          |  |  |  | inputList = [] |  
          |  | ) |  |  |  | inherited | 
 
Configure parameters for a single job and write job configuration files. If inputLIst
   is given, this is the list of input files used for the job.
 
Definition at line 257 of file JobRunner.py.
  257     def configureJob(self,jobnr,inputList=[]):
 
  258         """Configure parameters for a single job and write job configuration files. If inputLIst 
  259            is given, this is the list of input files used for the job.""" 
  260         if jobnr 
in self.jobs:
 
  261             raise JobRunnerError (
'Job number %s already configured' % jobnr)
 
  264         jobConfig[
'jobnr'] = jobnr
 
  265         self.setParam(
'jobnr',jobnr)
 
  269             jobConfig[
'inputfiles'] = inputList
 
  271             inputfiles = self.getParam(
'inputfiles')
 
  272             iFirst = self.getParam(
'filesperjob')*jobnr
 
  273             if iFirst >= len(inputfiles):
 
  274                 raise JobRunnerError (
'Jobnr = %i too high for available number of files' % jobnr)
 
  275             iLast = iFirst + self.getParam(
'filesperjob')
 
  276             if iLast > len(inputfiles):
 
  277                 iLast=len(inputfiles)
 
  278             jobConfig[
'inputfiles'] = inputfiles[iFirst:iLast]
 
  281         for p 
in self.paramOrder:
 
  282             if self.isSpecialParam(p): 
continue 
  283             value = self.getParam(p)
 
  284             if isinstance(value,str):
 
  285                 jobConfig[p] = value % jobConfig
 
  290         if jobConfig[
'setuprelease']:
 
  292             jobConfig[
'cmdsetup'] = self.getParam(
'cmdsetup') % jobConfig
 
  294             if not jobConfig[
'cmdsetup']:
 
  295                 jobConfig[
'cmdsetup'] = os.getenv(
'CMDSETUP',
'source /afs/cern.ch/atlas/software/dist/AtlasSetup/scripts/asetup.sh %(release)s --noautocdtest') % jobConfig
 
  297             jobConfig[
'cmdsetup'] = 
'' 
  301         jobConfig[
'cmdcopyfiles'] = self.getParam(
'cmdcopyfiles') % jobConfig
 
  302         if not jobConfig[
'cmdcopyfiles']:
 
  304             for p 
in self.paramToBeCopied:
 
  305                 if not jobConfig[p]: 
continue 
  306                 filestobecopied = 
' '.
join([filestobecopied,jobConfig[p]])
 
  307                 jobConfig[p] = os.path.basename(jobConfig[p])
 
  309                 jobConfig[
'cmdcopyfiles'] = 
'cp -p '+filestobecopied+
' .' 
  313         jobConfig[
'cmddefinepoolcatalog'] = self.getParam(
'cmddefinepoolcatalog') % jobConfig
 
  314         if not jobConfig[
'cmddefinepoolcatalog']:
 
  315             if jobConfig[
'addinputtopoolcatalog']:
 
  316                 argstring = 
' '.
join(jobConfig[
'filesforpoolcatalog']+jobConfig[
'inputfiles']) % jobConfig
 
  318                 argstring = 
' '.
join(jobConfig[
'filesforpoolcatalog']) % jobConfig
 
  320                 jobConfig[
'cmddefinepoolcatalog'] = 
'pool_insertFileToCatalog '+argstring
 
  323         if jobConfig[
'returnstatuscode']:
 
  324             jobConfig[
'cmdexit'] = self.getParam(
'cmdexit') % jobConfig
 
  325             if not jobConfig[
'cmdexit']:
 
  326                 jobConfig[
'cmdexit'] = 
'exit $status' % jobConfig
 
  328             jobConfig[
'cmdexit'] = 
'' 
  331         jobConfig[
'outputfilestring'] = 
','.
join([jobConfig[
'outputfileprefix']+f 
for f 
in jobConfig[
'outputfilelist']])
 
  334         if jobConfig[
'taskpostprocsteps']:
 
  335             jobConfig[
'doneflag'] = jobConfig[
'postprocflag']
 
  337             jobConfig[
'doneflag'] = jobConfig[
'completedflag']
 
  338         jobConfig[
'script'] = self.getParam(
'script') % jobConfig
 
  343         for f 
in jobConfig[
'outputfilelist']:
 
  344             if os.access(jobConfig[
'outputfileprefix']+f,os.F_OK):
 
  345                 raise JobRunnerError (
'Job output file %s exists already' % jobConfig[f])
 
  346         for f 
in (
'configfile', 
'scriptfile', 
'logfile'):
 
  347             if os.access(jobConfig[f],os.F_OK):
 
  348                 raise JobRunnerError (
'Job configuration or log file %s exists already' % jobConfig[f])
 
  351         os.makedirs(
'%(jobdir)s' % jobConfig) 
 
  354         config = 
open(jobConfig[
'configfile'],
'w')
 
  355         config.write(
'# Job configuration data for job %(jobname)s\n' % jobConfig)
 
  356         config.write(
'# Generated by JobRunner.py\n\n')
 
  357         if pprint.isreadable(jobConfig):
 
  358             config.write(
'jobConfig = '+pprint.pformat(jobConfig))
 
  360             config.write(
'jobConfig = '+
repr(jobConfig))
 
  365         script = 
open(jobConfig[
'scriptfile'],
'w')
 
  366         script.write(jobConfig[
'script'])
 
  368         os.chmod(jobConfig[
'scriptfile'],0o755)
 
  371         self.jobs[jobnr] = jobConfig
 
 
 
◆ dumpParams()
  
  | 
        
          | def python.JobRunner.JobRunner.dumpParams | ( |  | self, |  
          |  |  |  | format = '%(name)-20s = %(value)s', |  
          |  |  |  | maxLineLength = 0 |  
          |  | ) |  |  |  | inherited | 
 
Dump all parameters into a string in a user-specified format.
 
Definition at line 236 of file JobRunner.py.
  236     def dumpParams(self,format='%(name)-20s = %(value)s
',maxLineLength=0):        """Dump all parameters into a string in a user-specified format.""" 
  238         for p 
in self.paramOrder:
 
  239             s = format % self.params[p].__dict__
 
  241                 s = s[0:maxLineLength-3]+
'...' 
 
 
◆ getNJobs()
  
  | 
        
          | def python.JobRunner.JobRunner.getNJobs | ( |  | self | ) |  |  | inherited | 
 
 
◆ getOutputFiles()
  
  | 
        
          | def python.JobRunner.JobRunner.getOutputFiles | ( |  | self | ) |  |  | inherited | 
 
Get a list of all output files.
 
Definition at line 503 of file JobRunner.py.
  503     def getOutputFiles(self):
 
  504         """Get a list of all output files.""" 
  507             for f 
in sorted(glob.glob(self.jobs[j][
'outputfileprefix']+
'*')):
 
  508                 outputFiles.append(f)
 
 
 
◆ getParam()
  
  | 
        
          | def python.JobRunner.JobRunner.getParam | ( |  | self, |  
          |  |  |  | name |  
          |  | ) |  |  |  | inherited | 
 
Get value of a parameter by name.
 
Definition at line 192 of file JobRunner.py.
  192     def getParam(self,name):
 
  193         """Get value of a parameter by name.""" 
  194         return self.params[name].value
 
 
 
◆ getRunningJobs()
  
  | 
        
          | def python.JobRunner.JobRunner.getRunningJobs | ( |  | self | ) |  |  | inherited | 
 
Get list of jobs submitted by this JobRunner that are still running
 
Definition at line 483 of file JobRunner.py.
  483     def getRunningJobs(self):
 
  484         """Get list of jobs submitted by this JobRunner that are still running""" 
  487             if os.path.exists(self.jobs[j][
'subflag']) 
or os.path.exists(self.jobs[j][
'runflag']):
 
  488                 runningJobs.append(self.jobs[j][
'jobname'])
 
 
 
◆ isSpecialParam()
  
  | 
        
          | def python.JobRunner.JobRunner.isSpecialParam | ( |  | self, |  
          |  |  |  | name |  
          |  | ) |  |  |  | inherited | 
 
Check if parameter requires special treatment.
 
Definition at line 197 of file JobRunner.py.
  197     def isSpecialParam(self,name):
 
  198         """Check if parameter requires special treatment.""" 
  199         return self.params[name].isSpecial
 
 
 
◆ log()
  
  | 
        
          | def python.JobRunner.JobRunner.log | ( |  | self, |  
          |  |  |  | subject, |  
          |  |  |  | body, |  
          |  |  | ** | data |  
          |  | ) |  |  |  | inherited | 
 
Send a log message to a set of users specified by parameter 'logmail'.
 
Definition at line 512 of file JobRunner.py.
  512     def log(self,subject,body,**data):
 
  513         """Send a log message to a set of users specified by parameter 'logmail'.""" 
  514         if self.getParam(
'logmail'):
 
  515             msg = 
'Generated by JobRunner on host '+socket.gethostname()+
' by '+pwd.getpwuid(os.getuid())[0]+
' at '+time.asctime()
 
  516             msg += 
'\n\nCurrent working directory = '+os.getcwd()
 
  517             if body: msg += 
'\n\n'+body
 
  518             for k 
in data.keys():
 
  519                 if isinstance(data[k],dict):
 
  521                     items = data[k].
keys()
 
  524                         msg += 
"\n    %-20s = %s" % (i,data[k][i])
 
  526                     msg += 
"\n\n%-20s = %s" % (k,data[k])
 
  527             subprocess.call(
"echo '%s' | mail -s '%s' '%s'" %
 
  528                 (msg,subject,self.getParam(
'logmail')), shell=
True)
 
 
 
◆ registerToBeCopied()
  
  | 
        
          | def python.JobRunner.JobRunner.registerToBeCopied | ( |  | self, |  
          |  |  |  | paramName |  
          |  | ) |  |  |  | inherited | 
 
Register a parameter holding the name of an input file to be
   copied before a job starts.
 
Definition at line 202 of file JobRunner.py.
  202     def registerToBeCopied(self,paramName):
 
  203         """Register a parameter holding the name of an input file to be 
  204            copied before a job starts.""" 
  205         if paramName 
not in self.paramToBeCopied:
 
  206             self.paramToBeCopied.
append(paramName)
 
 
 
◆ run()
  
  | 
        
          | def python.JobRunner.JobRunner.run | ( |  | self, |  
          |  |  |  | jobnr = None |  
          |  | ) |  |  |  | inherited | 
 
Run all configured job(s)
 
Definition at line 465 of file JobRunner.py.
  465     def run(self,jobnr=None):
 
  466         """Run all configured job(s)""" 
  468             raise JobRunnerError (
'No configured jobs')
 
  469         njobs = len(self.jobs)
 
  470         self.log(
'JobRunner: starting '+
str(njobs)+
' jobs',self.dumpParams())
 
  472             print (
'\nStarting job %i (using %s) ...\n' % (j,self.__class__))
 
 
 
◆ runJob()
  
  | 
        
          | def python.JobRunner.JobRunner.runJob | ( |  | self, |  
          |  |  |  | jobnr |  
          |  | ) |  |  |  | inherited | 
 
Run a single configured job.
 
Definition at line 455 of file JobRunner.py.
  455     def runJob(self,jobnr):
 
  456         """Run a single configured job.""" 
  457         if jobnr 
not in self.jobs:
 
  458             raise JobRunnerError (
'Job number %s is not configured' % jobnr)
 
  459         jobConfig = self.jobs[jobnr]
 
  460         subprocess.call(
'touch '+jobConfig[
'subflag'], shell=
True)
 
  461         status = self.submitJob(jobConfig)
 
  462         self.jobStatus[jobnr] = status
 
 
 
◆ setParam()
  
  | 
        
          | def python.JobRunner.JobRunner.setParam | ( |  | self, |  
          |  |  |  | name, |  
          |  |  |  | value = None, |  
          |  |  |  | description = None, |  
          |  |  |  | isSpecial = None, |  
          |  |  |  | insertAtFront = False |  
          |  | ) |  |  |  | inherited | 
 
Set the value of a job parameter. Job parameters may be templates that name
   previously defined job parameters and will be evaluated when a given job
   is configured. Therefore the order in which setParam is called for the different
   parameters is relevant. insertAtFront can be set to True to force early
   evaluation of a given parameter.
 
Definition at line 157 of file JobRunner.py.
  157     def setParam(self,name,value=None,description=None,isSpecial=None,insertAtFront=False):
 
  158         """Set the value of a job parameter. Job parameters may be templates that name 
  159            previously defined job parameters and will be evaluated when a given job 
  160            is configured. Therefore the order in which setParam is called for the different 
  161            parameters is relevant. insertAtFront can be set to True to force early 
  162            evaluation of a given parameter.""" 
  163         p = self.params[name] = self.params.
get(name,JobRunnerParameter(name))
 
  164         if value 
is not None:
 
  166         if description 
is not None:
 
  167             p.description = description
 
  168         if isSpecial 
is not None:
 
  169             p.isSpecial=isSpecial
 
  170         if name 
not in self.paramOrder:
 
  172                 self.paramOrder.insert(0,name)
 
  174                 self.paramOrder.
append(name)
 
 
 
◆ showParams()
  
  | 
        
          | def python.JobRunner.JobRunner.showParams | ( |  | self, |  
          |  |  |  | maxLineLength = 80 |  
          |  | ) |  |  |  | inherited | 
 
Show current job parameters.
 
Definition at line 227 of file JobRunner.py.
  227     def showParams(self,maxLineLength=80):
 
  228         """Show current job parameters.""" 
  229         for p 
in self.paramOrder:
 
  230             s = 
str(self.params[p])
 
  231             if maxLineLength > 0 
and len(s) > maxLineLength:
 
  232                 s = s[0:maxLineLength-3] + 
'...' 
 
 
◆ submitJob()
      
        
          | def python.PDSFJobRunner.PDSFJobRunner.submitJob | ( |  | self, | 
        
          |  |  |  | jobConfig | 
        
          |  | ) |  |  | 
      
 
Submit a JobRunner job as a SGE batch job.
 
Reimplemented from python.JobRunner.JobRunner.
Definition at line 53 of file PDSFJobRunner.py.
   53     def submitJob(self,jobConfig):
 
   54         """Submit a JobRunner job as a SGE batch job.""" 
   56         batchCmd = 
'qsub -N %(jobname)s -j y -o %(logfile)s %(scriptfile)s' % jobConfig
 
 
 
◆ wait()
  
  | 
        
          | def python.JobRunner.JobRunner.wait | ( |  | self | ) |  |  | inherited | 
 
Wait until all jobs have completed
 
Definition at line 492 of file JobRunner.py.
  493         """Wait until all jobs have completed""" 
  496             runningJobs = self.getRunningJobs()
 
  497             if not runningJobs: 
break 
  499             print (time.asctime(),
' Waiting for %2s job(s) (%s)' % (len(runningJobs),runningJobs))
 
  500         self.log(
'JobRunner: finished',self.dumpParams(),outputFiles=self.getOutputFiles())
 
 
 
 
◆ jobs
  
  | 
        
          | python.JobRunner.JobRunner.jobs |  | inherited | 
 
 
◆ jobStatus
  
  | 
        
          | python.JobRunner.JobRunner.jobStatus |  | inherited | 
 
 
◆ paramOrder
  
  | 
        
          | python.JobRunner.JobRunner.paramOrder |  | inherited | 
 
 
◆ params
  
  | 
        
          | python.JobRunner.JobRunner.params |  | inherited | 
 
 
◆ paramToBeCopied
  
  | 
        
          | python.JobRunner.JobRunner.paramToBeCopied |  | inherited | 
 
 
The documentation for this class was generated from the following file:
 
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)