ATLAS Offline Software
Public Member Functions | Public Attributes | List of all members
python.TrfUtils.JobRunnerTransform Class Reference
Collaboration diagram for python.TrfUtils.JobRunnerTransform:

Public Member Functions

def __init__ (self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='', mandatoryArgs=[], optionalArgs=[])
 
def setProdDir (self, dir)
 
def setProdTaskDatabase (self, taskdb)
 
def getJobRunner (self, **jobRunnerArgs)
 
def addOutput (self, paramName, templateName, jobDirName='')
 
def showParams (self)
 
def configure (self)
 
def addTaskToDatabase (self, comment='')
 
def run (self)
 
def go (self, commentForTaskDb='')
 
def report (self, errAcronym='', moreText='')
 

Public Attributes

 inputParamName
 
 outputParamName
 
 mandatoryArgs
 
 optionalArgs
 
 runner
 
 templateOutputName
 
 jobDirOutputName
 
 outputList
 
 reportName
 
 prodDir
 
 prodTaskDb
 
 argdictFileName
 
 argdict
 
 inputfiles
 
 outputfile
 
 outputds
 
 dataset
 
 taskname
 
 jobname
 

Detailed Description

Job transform for running a JobRunner job at T0 or at the CAF Task Management
   System. Note that this class may abort execution by calling exit() in case of errors.
   Except in case of syntactical errors caught by OptionParser, a jobReport will always
   be produced.

Definition at line 117 of file TrfUtils.py.

Constructor & Destructor Documentation

◆ __init__()

def python.TrfUtils.JobRunnerTransform.__init__ (   self,
  inputParamName,
  outputParamName,
  templateOutputName = 'outputfile',
  jobDirOutputName = '',
  mandatoryArgs = [],
  optionalArgs = [] 
)

Definition at line 123 of file TrfUtils.py.

123  def __init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='',
124  mandatoryArgs = [], optionalArgs = []):
125  self.inputParamName = inputParamName
126  self.outputParamName = outputParamName
127  self.mandatoryArgs = mandatoryArgs
128  if inputParamName not in mandatoryArgs:
129  mandatoryArgs.append(inputParamName)
130  if outputParamName not in mandatoryArgs:
131  mandatoryArgs.append(outputParamName)
132  self.optionalArgs = optionalArgs
133  self.runner = None
134  self.templateOutputName = templateOutputName
135  self.jobDirOutputName = jobDirOutputName
136  self.outputList = [ ] # List of qualified output files (ie file names including dataset name)
137  self.reportName = 'jobReport.json'
138  self.prodDir = '.'
139  self.prodTaskDb = ''
140 
141  # Process command line args and extract argdict
142  parser = OptionParser(usage="%prog --argJSON=<JSON file>")
143  parser.add_option('-a', '--argJSON', dest='argJSON', help='Local file with JSON-serialized dictionary of key/value pairs')
144  (options,args) = parser.parse_args()
145  if len(args) != 0:
146  self.report('WRONGARGSNUMBER_ERROR','Wrong number of command line arguments')
147  parser.error('wrong number of command line arguments')
148  if not options.argJSON:
149  self.report('NOARGDICT_ERROR','Must use --argJSON to specify argdict.json file')
150  parser.error('option --argJSON is mandatory')
151  try:
152  self.argdictFileName = options.argJSON
153  self.argdict = readJSON(options.argJSON)
154  except Exception as e:
155  self.report('ARGDICTNOTREADABLE_ERROR','File %s with JSON-serialized argdict cannot be read' % options.argJSON)
156  print ('ERROR: file %s with JSON-serialized argdict cannot be read' % options.argJSON)
157  print ('DEBUG: Exception =',e)
158  sys.exit(1)
159 
160  # Print argdict
161  print ('\nInput argdict (%s):\n' % options.argJSON)
162  print (pprint.pformat(self.argdict))
163  print ('\n')
164 
165  # Check for all mandatory parameters
166  missingArgs = [ x for x in mandatoryArgs if x not in self.argdict ]
167  if missingArgs:
168  self.report('MISSINGPARAM_ERROR','Mandatory parameter(s) missing from argdict: '+str(missingArgs))
169  print ('ERROR: mandatory parameter(s) missing from argdict:', missingArgs)
170  sys.exit(1)
171 
172  # Extract input and output dataset and file names
173  # NOTE: inputs come from a list, output is a single file (but there might be others)
174  self.inputds, self.inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])
175  if not self.inputfiles:
176  self.report('NOINPUTFILE_ERROR','No input file specified (only dataset name?)')
177  print ('ERROR: no input file specified')
178  sys.exit(1)
179  self.outputfile = getFileName(self.argdict[outputParamName])
180  #self.outputList.append(self.argdict[outputParamName])
181  self.outputds = getDataSetName(self.argdict[outputParamName])
182  if not self.outputds:
183  self.report('NODATASET_ERROR','No dataset given in parameter '+outputParamName)
184  print ('ERROR: No dataset given in parameter',outputParamName)
185  sys.exit(1)
186  splitDSName = self.outputds.split('.')
187  if len(splitDSName)<5:
188  self.report('DATASETNAME_ERROR',"Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
189  print ("ERROR: Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
190  sys.exit(1)
191  self.dataset = '.'.join(splitDSName[:-3])
192  self.taskname = '.'.join(splitDSName[-2:])
193  self.jobname = os.path.basename(self.outputfile)
194  if '_attempt' in self.argdict:
195  self.jobname = self.jobname + '.v%s' % self.argdict['_attempt']
196 

Member Function Documentation

◆ addOutput()

def python.TrfUtils.JobRunnerTransform.addOutput (   self,
  paramName,
  templateName,
  jobDirName = '' 
)
Add an additional output file to the output dataset. If jobDirName is set, the
   output file will also be copied under that name to the job directory.

Definition at line 235 of file TrfUtils.py.

235  def addOutput(self, paramName, templateName, jobDirName=''):
236  """Add an additional output file to the output dataset. If jobDirName is set, the
237  output file will also be copied under that name to the job directory."""
238  #self.outputList.append(self.argdict[paramName])
239  f = getFileName(self.argdict[paramName])
240  self.runner.setParam(templateName,f)
241  if jobDirName:
242  self.runner.appendParam('cmdjobpostprocessing',
243  'cp %s %s/%s-%s' % (f,self.runner.getParam('jobdir'),self.jobname,jobDirName))

◆ addTaskToDatabase()

def python.TrfUtils.JobRunnerTransform.addTaskToDatabase (   self,
  comment = '' 
)

Definition at line 266 of file TrfUtils.py.

266  def addTaskToDatabase(self,comment=''):
267  if self.prodTaskDb:
268  try:
269  with TaskManager(self.prodTaskDb) as taskman:
270  taskman.addTask(self.dataset,
271  self.taskname,
272  self.runner.getParam('joboptionpath'),
273  self.runner.getParam('release'),
274  self.runner.getNJobs(),
275  self.runner.getParam('taskpostprocsteps'),
276  comment=comment)
277  except Exception as e:
278  print ('ERROR: Unable to add task to task manager database '+self.prodTaskDb)
279  print ('DEBUG: Exception =',e)
280  else:
281  print ('WARNING: No task manager database configured')
282 

◆ configure()

def python.TrfUtils.JobRunnerTransform.configure (   self)

Definition at line 248 of file TrfUtils.py.

248  def configure(self):
249  self.runner.configure()
250  # If JobRunner configuration was successful, move any earlier jobs
251  # out of the way if _attempt is specified. This should guarantee
252  # that the task directory contains only valid jobs
253  if '_attempt' in self.argdict:
254  currentAttempt = int(self.argdict['_attempt'])
255  for i in range(-1,currentAttempt):
256  if i==-1:
257  d = '%s/%s/%s/%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile))
258  else:
259  d = '%s/%s/%s/%s.v%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile),i)
260  if os.path.exists(d):
261  retriedJobDir = '%s/%s/%s/RETRIED_JOBS' % (self.prodDir,self.dataset,self.taskname)
262  print ('\nMoving previous job directory %s to %s' % (d,retriedJobDir))
263  os.system('mkdir -p %s' % (retriedJobDir))
264  os.system('mv -f %s %s' % (d,retriedJobDir))
265 

◆ getJobRunner()

def python.TrfUtils.JobRunnerTransform.getJobRunner (   self,
**  jobRunnerArgs 
)

Definition at line 209 of file TrfUtils.py.

209  def getJobRunner(self,**jobRunnerArgs):
210  if self.runner:
211  print ('WARNING: Overwriting already configured JobRunner')
212  self.runner = JobRunner.JobRunner(jobdir=self.prodDir+'/'+self.dataset+'/'+self.taskname+'/'+self.jobname,
213  jobname=self.jobname,
214  inputds=self.inputds,
215  inputfiles=self.inputfiles,
216  outputds=self.outputds,
217  filesperjob=len(self.inputfiles),
218  setuprelease=False,
219  addinputtopoolcatalog=False,
220  returnstatuscode=True)
221  self.runner.appendParam('cmdjobpreprocessing',
222  'cp %s %s/%s.argdict.json' % (self.argdictFileName, self.runner.getParam('jobdir'),self.jobname))
223  self.runner.setParam(self.templateOutputName,self.outputfile)
224  if self.jobDirOutputName:
225  self.runner.appendParam('cmdjobpostprocessing',
226  'cp %s %s/%s-%s' % (self.outputfile, self.runner.getParam('jobdir'),self.jobname,self.jobDirOutputName))
227  for k,v in jobRunnerArgs.items():
228  self.runner.setParam(k,v)
229  for k,v in self.argdict.items():
230  if k in self.mandatoryArgs: continue
231  if k in self.optionalArgs: continue
232  self.runner.setParam(k,v)
233  return self.runner
234 

◆ go()

def python.TrfUtils.JobRunnerTransform.go (   self,
  commentForTaskDb = '' 
)
Show parameters, configure job, update task database, run job and produce report.
   This method will ensure that a job report is always produced, independently of any errors.

Definition at line 288 of file TrfUtils.py.

288  def go(self,commentForTaskDb=''):
289  """Show parameters, configure job, update task database, run job and produce report.
290  This method will ensure that a job report is always produced, independently of any errors."""
291  try:
292  self.showParams()
293  self.configure()
294  except Exception as e:
295  self.report('JOBRUNNER_CONFIGURE_ERROR','Unable to configure JobRunner job - perhaps same job was already configured / run before?')
296  print ("ERROR: Unable to configure JobRunner job - perhaps same job was already configured / run before?")
297  print ('DEBUG: Exception =',e)
298  else:
299  try:
300  self.addTaskToDatabase(commentForTaskDb)
301  self.run()
302  finally:
303  self.report()
304 
305 

◆ report()

def python.TrfUtils.JobRunnerTransform.report (   self,
  errAcronym = '',
  moreText = '' 
)

Definition at line 306 of file TrfUtils.py.

306  def report(self,errAcronym='',moreText=''):
307  if errAcronym:
308  jobStatus = 999
309  jobStatusAcronym = errAcronym
310  else:
311  try:
312  jobStatus = self.runner.jobStatus[0] # Assume we always run single jobs
313  jobStatusAcronym = 'OK' if jobStatus==0 else 'ATHENA_ERROR'
314  except Exception:
315  jobStatus = 999
316  jobStatusAcronym = 'NOJOBSTATUS_ERROR'
317  moreText = "Jobrunner terminated abnormally and w/o a job status; athena job may or may not have run"
318 
319  jobStatusAcronym = jobStatusAcronym[:128] # 128 char limit in T0 DB
320  report = {'exitCode': jobStatus,
321  'exitAcronym': jobStatusAcronym,
322  'files': { 'output':[] }
323  }
324  if moreText:
325  report['exitMsg'] = moreText
326 
327  # If there was no error, store outputs (request from Armin to not give any outputs for failed jobs).
328  # Must also check that output file indeed exists.
329  if jobStatus==0:
330  for f in self.outputList:
331  if os.path.exists(getFileName(f)):
332  report['files']['output'].append(getFileDescription(f))
333 
334  # Write jobReport file
335  writeJSON(self.reportName,report)
336 
337  # Copy jobReport file to job directory - note we do this only if there was
338  # no error, otherwise we might overwrite an older report from an OK job
339  if jobStatus==0:
340  try:
341  os.system('cp %s %s/%s.%s' % (self.reportName,self.runner.getParam('jobdir'),self.jobname,self.reportName) )
342  except Exception as e:
343  print ('WARNING: Copying of job report file (%s) to job directory failed' % self.reportName)
344  print ('DEBUG: Exception =',e)
345 
346  # Nicely print job report to stdout
347  print ('\n\nJob report (jobReport.json):\n')
348  print (pprint.pformat(report))
349  print ('\n')

◆ run()

def python.TrfUtils.JobRunnerTransform.run (   self)

Definition at line 283 of file TrfUtils.py.

283  def run(self):
284  self.runner.run()
285  #print (self.runner.jobStatus)
286 
287 

◆ setProdDir()

def python.TrfUtils.JobRunnerTransform.setProdDir (   self,
  dir 
)

Definition at line 197 of file TrfUtils.py.

197  def setProdDir(self,dir):
198  if os.access(dir,os.W_OK):
199  self.prodDir = dir
200  else:
201  # Continue anyway or abort?
202  print ('ERROR: No write access to production directory',dir,'- will use current working directory instead:', os.getcwd())
203  self.prodDir = os.getcwd()
204  sys.exit(1)
205 

◆ setProdTaskDatabase()

def python.TrfUtils.JobRunnerTransform.setProdTaskDatabase (   self,
  taskdb 
)

Definition at line 206 of file TrfUtils.py.

206  def setProdTaskDatabase(self,taskdb):
207  self.prodTaskDb = taskdb
208 

◆ showParams()

def python.TrfUtils.JobRunnerTransform.showParams (   self)

Definition at line 244 of file TrfUtils.py.

244  def showParams(self):
245  print ('JobRunner parameters:\n')
246  self.runner.showParams()
247 

Member Data Documentation

◆ argdict

python.TrfUtils.JobRunnerTransform.argdict

Definition at line 152 of file TrfUtils.py.

◆ argdictFileName

python.TrfUtils.JobRunnerTransform.argdictFileName

Definition at line 151 of file TrfUtils.py.

◆ dataset

python.TrfUtils.JobRunnerTransform.dataset

Definition at line 190 of file TrfUtils.py.

◆ inputfiles

python.TrfUtils.JobRunnerTransform.inputfiles

Definition at line 173 of file TrfUtils.py.

◆ inputParamName

python.TrfUtils.JobRunnerTransform.inputParamName

Definition at line 124 of file TrfUtils.py.

◆ jobDirOutputName

python.TrfUtils.JobRunnerTransform.jobDirOutputName

Definition at line 134 of file TrfUtils.py.

◆ jobname

python.TrfUtils.JobRunnerTransform.jobname

Definition at line 192 of file TrfUtils.py.

◆ mandatoryArgs

python.TrfUtils.JobRunnerTransform.mandatoryArgs

Definition at line 126 of file TrfUtils.py.

◆ optionalArgs

python.TrfUtils.JobRunnerTransform.optionalArgs

Definition at line 131 of file TrfUtils.py.

◆ outputds

python.TrfUtils.JobRunnerTransform.outputds

Definition at line 180 of file TrfUtils.py.

◆ outputfile

python.TrfUtils.JobRunnerTransform.outputfile

Definition at line 178 of file TrfUtils.py.

◆ outputList

python.TrfUtils.JobRunnerTransform.outputList

Definition at line 135 of file TrfUtils.py.

◆ outputParamName

python.TrfUtils.JobRunnerTransform.outputParamName

Definition at line 125 of file TrfUtils.py.

◆ prodDir

python.TrfUtils.JobRunnerTransform.prodDir

Definition at line 137 of file TrfUtils.py.

◆ prodTaskDb

python.TrfUtils.JobRunnerTransform.prodTaskDb

Definition at line 138 of file TrfUtils.py.

◆ reportName

python.TrfUtils.JobRunnerTransform.reportName

Definition at line 136 of file TrfUtils.py.

◆ runner

python.TrfUtils.JobRunnerTransform.runner

Definition at line 132 of file TrfUtils.py.

◆ taskname

python.TrfUtils.JobRunnerTransform.taskname

Definition at line 191 of file TrfUtils.py.

◆ templateOutputName

python.TrfUtils.JobRunnerTransform.templateOutputName

Definition at line 133 of file TrfUtils.py.


The documentation for this class was generated from the following file:
python.TrfUtils.getFileDescription
def getFileDescription(fileName, dsname='')
Definition: TrfUtils.py:95
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
configure
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
Definition: TrigGlobEffCorrValidation.cxx:514
run
int run(int argc, char *argv[])
Definition: ttree2hdf5.cxx:28
checkTP.report
report
Definition: checkTP.py:127
python.TrfUtils.writeJSON
def writeJSON(fname, data)
Definition: TrfUtils.py:27
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.TrfUtils.getDataSetName
def getDataSetName(name, sep='#')
Definition: TrfUtils.py:33
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.TrfUtils.readJSON
def readJSON(fname)
Definition: TrfUtils.py:19
run
Definition: run.py:1
python.doZLumi.go
def go(fname)
Definition: doZLumi.py:78
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
python.TrfUtils.parseQualifiedFileNames
def parseQualifiedFileNames(inputList, sep='#')
Definition: TrfUtils.py:58
python.TrfUtils.getFileName
def getFileName(name, sep='#')
Definition: TrfUtils.py:45
str
Definition: BTagTrackIpAccessor.cxx:11
Trk::split
@ split
Definition: LayerMaterialProperties.h:38