ATLAS Offline Software
Loading...
Searching...
No Matches
python.TrfUtils.JobRunnerTransform Class Reference
Collaboration diagram for python.TrfUtils.JobRunnerTransform:

Public Member Functions

 __init__ (self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='', mandatoryArgs=[], optionalArgs=[])
 setProdDir (self, dir)
 setProdTaskDatabase (self, taskdb)
 getJobRunner (self, **jobRunnerArgs)
 addOutput (self, paramName, templateName, jobDirName='')
 showParams (self)
 configure (self)
 addTaskToDatabase (self, comment='')
 run (self)
 go (self, commentForTaskDb='')
 report (self, errAcronym='', moreText='')

Public Attributes

 inputParamName = inputParamName
 outputParamName = outputParamName
continue mandatoryArgs = mandatoryArgs
continue optionalArgs = optionalArgs
 runner = None
 templateOutputName = templateOutputName
 jobDirOutputName = jobDirOutputName
list outputList = [ ]
str reportName = 'jobReport.json'
str prodDir = '.'
str prodTaskDb = ''
 argdictFileName = options.argJSON
 argdict = readJSON(options.argJSON)
 inputds
 inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])
 outputfile = getFileName(self.argdict[outputParamName])
 outputds = getDataSetName(self.argdict[outputParamName])
str dataset = '.'.join(splitDSName[:-3])
str taskname = '.'.join(splitDSName[-2:])
str jobname = os.path.basename(self.outputfile)

Detailed Description

Job transform for running a JobRunner job at T0 or at the CAF Task Management
   System. Note that this class may abort execution by calling exit() in case of errors.
   Except in case of syntactical errors caught by OptionParser, a jobReport will always
   be produced.

Definition at line 117 of file TrfUtils.py.

Constructor & Destructor Documentation

◆ __init__()

python.TrfUtils.JobRunnerTransform.__init__ ( self,
inputParamName,
outputParamName,
templateOutputName = 'outputfile',
jobDirOutputName = '',
mandatoryArgs = [],
optionalArgs = [] )

Definition at line 123 of file TrfUtils.py.

124 mandatoryArgs = [], optionalArgs = []):
125 self.inputParamName = inputParamName
126 self.outputParamName = outputParamName
127 self.mandatoryArgs = mandatoryArgs
128 if inputParamName not in mandatoryArgs:
129 mandatoryArgs.append(inputParamName)
130 if outputParamName not in mandatoryArgs:
131 mandatoryArgs.append(outputParamName)
132 self.optionalArgs = optionalArgs
133 self.runner = None
134 self.templateOutputName = templateOutputName
135 self.jobDirOutputName = jobDirOutputName
136 self.outputList = [ ] # List of qualified output files (ie file names including dataset name)
137 self.reportName = 'jobReport.json'
138 self.prodDir = '.'
139 self.prodTaskDb = ''
140
141 # Process command line args and extract argdict
142 parser = OptionParser(usage="%prog --argJSON=<JSON file>")
143 parser.add_option('-a', '--argJSON', dest='argJSON', help='Local file with JSON-serialized dictionary of key/value pairs')
144 (options,args) = parser.parse_args()
145 if len(args) != 0:
146 self.report('WRONGARGSNUMBER_ERROR','Wrong number of command line arguments')
147 parser.error('wrong number of command line arguments')
148 if not options.argJSON:
149 self.report('NOARGDICT_ERROR','Must use --argJSON to specify argdict.json file')
150 parser.error('option --argJSON is mandatory')
151 try:
152 self.argdictFileName = options.argJSON
153 self.argdict = readJSON(options.argJSON)
154 except Exception as e:
155 self.report('ARGDICTNOTREADABLE_ERROR','File %s with JSON-serialized argdict cannot be read' % options.argJSON)
156 print ('ERROR: file %s with JSON-serialized argdict cannot be read' % options.argJSON)
157 print ('DEBUG: Exception =',e)
158 sys.exit(1)
159
160 # Print argdict
161 print ('\nInput argdict (%s):\n' % options.argJSON)
162 print (pprint.pformat(self.argdict))
163 print ('\n')
164
165 # Check for all mandatory parameters
166 missingArgs = [ x for x in mandatoryArgs if x not in self.argdict ]
167 if missingArgs:
168 self.report('MISSINGPARAM_ERROR','Mandatory parameter(s) missing from argdict: '+str(missingArgs))
169 print ('ERROR: mandatory parameter(s) missing from argdict:', missingArgs)
170 sys.exit(1)
171
172 # Extract input and output dataset and file names
173 # NOTE: inputs come from a list, output is a single file (but there might be others)
174 self.inputds, self.inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])
175 if not self.inputfiles:
176 self.report('NOINPUTFILE_ERROR','No input file specified (only dataset name?)')
177 print ('ERROR: no input file specified')
178 sys.exit(1)
179 self.outputfile = getFileName(self.argdict[outputParamName])
180 #self.outputList.append(self.argdict[outputParamName])
181 self.outputds = getDataSetName(self.argdict[outputParamName])
182 if not self.outputds:
183 self.report('NODATASET_ERROR','No dataset given in parameter '+outputParamName)
184 print ('ERROR: No dataset given in parameter',outputParamName)
185 sys.exit(1)
186 splitDSName = self.outputds.split('.')
187 if len(splitDSName)<5:
188 self.report('DATASETNAME_ERROR',"Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
189 print ("ERROR: Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
190 sys.exit(1)
191 self.dataset = '.'.join(splitDSName[:-3])
192 self.taskname = '.'.join(splitDSName[-2:])
193 self.jobname = os.path.basename(self.outputfile)
194 if '_attempt' in self.argdict:
195 self.jobname = self.jobname + '.v%s' % self.argdict['_attempt']
196
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177

Member Function Documentation

◆ addOutput()

python.TrfUtils.JobRunnerTransform.addOutput ( self,
paramName,
templateName,
jobDirName = '' )
Add an additional output file to the output dataset. If jobDirName is set, the
   output file will also be copied under that name to the job directory.

Definition at line 235 of file TrfUtils.py.

235 def addOutput(self, paramName, templateName, jobDirName=''):
236 """Add an additional output file to the output dataset. If jobDirName is set, the
237 output file will also be copied under that name to the job directory."""
238 #self.outputList.append(self.argdict[paramName])
239 f = getFileName(self.argdict[paramName])
240 self.runner.setParam(templateName,f)
241 if jobDirName:
242 self.runner.appendParam('cmdjobpostprocessing',
243 'cp %s %s/%s-%s' % (f,self.runner.getParam('jobdir'),self.jobname,jobDirName))

◆ addTaskToDatabase()

python.TrfUtils.JobRunnerTransform.addTaskToDatabase ( self,
comment = '' )

Definition at line 266 of file TrfUtils.py.

266 def addTaskToDatabase(self,comment=''):
267 if self.prodTaskDb:
268 try:
269 with TaskManager(self.prodTaskDb) as taskman:
270 taskman.addTask(self.dataset,
271 self.taskname,
272 self.runner.getParam('joboptionpath'),
273 self.runner.getParam('release'),
274 self.runner.getNJobs(),
275 self.runner.getParam('taskpostprocsteps'),
276 comment=comment)
277 except Exception as e:
278 print ('ERROR: Unable to add task to task manager database '+self.prodTaskDb)
279 print ('DEBUG: Exception =',e)
280 else:
281 print ('WARNING: No task manager database configured')
282

◆ configure()

python.TrfUtils.JobRunnerTransform.configure ( self)

Definition at line 248 of file TrfUtils.py.

248 def configure(self):
249 self.runner.configure()
250 # If JobRunner configuration was successful, move any earlier jobs
251 # out of the way if _attempt is specified. This should guarantee
252 # that the task directory contains only valid jobs
253 if '_attempt' in self.argdict:
254 currentAttempt = int(self.argdict['_attempt'])
255 for i in range(-1,currentAttempt):
256 if i==-1:
257 d = '%s/%s/%s/%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile))
258 else:
259 d = '%s/%s/%s/%s.v%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile),i)
260 if os.path.exists(d):
261 retriedJobDir = '%s/%s/%s/RETRIED_JOBS' % (self.prodDir,self.dataset,self.taskname)
262 print ('\nMoving previous job directory %s to %s' % (d,retriedJobDir))
263 os.system('mkdir -p %s' % (retriedJobDir))
264 os.system('mv -f %s %s' % (d,retriedJobDir))
265
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)

◆ getJobRunner()

python.TrfUtils.JobRunnerTransform.getJobRunner ( self,
** jobRunnerArgs )

Definition at line 209 of file TrfUtils.py.

209 def getJobRunner(self,**jobRunnerArgs):
210 if self.runner:
211 print ('WARNING: Overwriting already configured JobRunner')
212 self.runner = JobRunner.JobRunner(jobdir=self.prodDir+'/'+self.dataset+'/'+self.taskname+'/'+self.jobname,
213 jobname=self.jobname,
214 inputds=self.inputds,
215 inputfiles=self.inputfiles,
216 outputds=self.outputds,
217 filesperjob=len(self.inputfiles),
218 setuprelease=False,
219 addinputtopoolcatalog=False,
220 returnstatuscode=True)
221 self.runner.appendParam('cmdjobpreprocessing',
222 'cp %s %s/%s.argdict.json' % (self.argdictFileName, self.runner.getParam('jobdir'),self.jobname))
223 self.runner.setParam(self.templateOutputName,self.outputfile)
224 if self.jobDirOutputName:
225 self.runner.appendParam('cmdjobpostprocessing',
226 'cp %s %s/%s-%s' % (self.outputfile, self.runner.getParam('jobdir'),self.jobname,self.jobDirOutputName))
227 for k,v in jobRunnerArgs.items():
228 self.runner.setParam(k,v)
229 for k,v in self.argdict.items():
230 if k in self.mandatoryArgs: continue
231 if k in self.optionalArgs: continue
232 self.runner.setParam(k,v)
233 return self.runner
234

◆ go()

python.TrfUtils.JobRunnerTransform.go ( self,
commentForTaskDb = '' )
Show parameters, configure job, update task database, run job and produce report.
   This method will ensure that a job report is always produced, independently of any errors.

Definition at line 288 of file TrfUtils.py.

288 def go(self,commentForTaskDb=''):
289 """Show parameters, configure job, update task database, run job and produce report.
290 This method will ensure that a job report is always produced, independently of any errors."""
291 try:
292 self.showParams()
293 self.configure()
294 except Exception as e:
295 self.report('JOBRUNNER_CONFIGURE_ERROR','Unable to configure JobRunner job - perhaps same job was already configured / run before?')
296 print ("ERROR: Unable to configure JobRunner job - perhaps same job was already configured / run before?")
297 print ('DEBUG: Exception =',e)
298 else:
299 try:
300 self.addTaskToDatabase(commentForTaskDb)
301 self.run()
302 finally:
303 self.report()
304
305

◆ report()

python.TrfUtils.JobRunnerTransform.report ( self,
errAcronym = '',
moreText = '' )

Definition at line 306 of file TrfUtils.py.

306 def report(self,errAcronym='',moreText=''):
307 if errAcronym:
308 jobStatus = 999
309 jobStatusAcronym = errAcronym
310 else:
311 try:
312 jobStatus = self.runner.jobStatus[0] # Assume we always run single jobs
313 jobStatusAcronym = 'OK' if jobStatus==0 else 'ATHENA_ERROR'
314 except Exception:
315 jobStatus = 999
316 jobStatusAcronym = 'NOJOBSTATUS_ERROR'
317 moreText = "Jobrunner terminated abnormally and w/o a job status; athena job may or may not have run"
318
319 jobStatusAcronym = jobStatusAcronym[:128] # 128 char limit in T0 DB
320 report = {'exitCode': jobStatus,
321 'exitAcronym': jobStatusAcronym,
322 'files': { 'output':[] }
323 }
324 if moreText:
325 report['exitMsg'] = moreText
326
327 # If there was no error, store outputs (request from Armin to not give any outputs for failed jobs).
328 # Must also check that output file indeed exists.
329 if jobStatus==0:
330 for f in self.outputList:
331 if os.path.exists(getFileName(f)):
332 report['files']['output'].append(getFileDescription(f))
333
334 # Write jobReport file
335 writeJSON(self.reportName,report)
336
337 # Copy jobReport file to job directory - note we do this only if there was
338 # no error, otherwise we might overwrite an older report from an OK job
339 if jobStatus==0:
340 try:
341 os.system('cp %s %s/%s.%s' % (self.reportName,self.runner.getParam('jobdir'),self.jobname,self.reportName) )
342 except Exception as e:
343 print ('WARNING: Copying of job report file (%s) to job directory failed' % self.reportName)
344 print ('DEBUG: Exception =',e)
345
346 # Nicely print job report to stdout
347 print ('\n\nJob report (jobReport.json):\n')
348 print (pprint.pformat(report))
349 print ('\n')

◆ run()

python.TrfUtils.JobRunnerTransform.run ( self)

Definition at line 283 of file TrfUtils.py.

283 def run(self):
284 self.runner.run()
285 #print (self.runner.jobStatus)
286
287
Definition run.py:1

◆ setProdDir()

python.TrfUtils.JobRunnerTransform.setProdDir ( self,
dir )

Definition at line 197 of file TrfUtils.py.

197 def setProdDir(self,dir):
198 if os.access(dir,os.W_OK):
199 self.prodDir = dir
200 else:
201 # Continue anyway or abort?
202 print ('ERROR: No write access to production directory',dir,'- will use current working directory instead:', os.getcwd())
203 self.prodDir = os.getcwd()
204 sys.exit(1)
205

◆ setProdTaskDatabase()

python.TrfUtils.JobRunnerTransform.setProdTaskDatabase ( self,
taskdb )

Definition at line 206 of file TrfUtils.py.

206 def setProdTaskDatabase(self,taskdb):
207 self.prodTaskDb = taskdb
208

◆ showParams()

python.TrfUtils.JobRunnerTransform.showParams ( self)

Definition at line 244 of file TrfUtils.py.

244 def showParams(self):
245 print ('JobRunner parameters:\n')
246 self.runner.showParams()
247

Member Data Documentation

◆ argdict

python.TrfUtils.JobRunnerTransform.argdict = readJSON(options.argJSON)

Definition at line 153 of file TrfUtils.py.

◆ argdictFileName

python.TrfUtils.JobRunnerTransform.argdictFileName = options.argJSON

Definition at line 152 of file TrfUtils.py.

◆ dataset

python.TrfUtils.JobRunnerTransform.dataset = '.'.join(splitDSName[:-3])

Definition at line 191 of file TrfUtils.py.

◆ inputds

python.TrfUtils.JobRunnerTransform.inputds

Definition at line 174 of file TrfUtils.py.

◆ inputfiles

python.TrfUtils.JobRunnerTransform.inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])

Definition at line 174 of file TrfUtils.py.

◆ inputParamName

python.TrfUtils.JobRunnerTransform.inputParamName = inputParamName

Definition at line 125 of file TrfUtils.py.

◆ jobDirOutputName

python.TrfUtils.JobRunnerTransform.jobDirOutputName = jobDirOutputName

Definition at line 135 of file TrfUtils.py.

◆ jobname

python.TrfUtils.JobRunnerTransform.jobname = os.path.basename(self.outputfile)

Definition at line 193 of file TrfUtils.py.

◆ mandatoryArgs

continue python.TrfUtils.JobRunnerTransform.mandatoryArgs = mandatoryArgs

Definition at line 127 of file TrfUtils.py.

◆ optionalArgs

continue python.TrfUtils.JobRunnerTransform.optionalArgs = optionalArgs

Definition at line 132 of file TrfUtils.py.

◆ outputds

python.TrfUtils.JobRunnerTransform.outputds = getDataSetName(self.argdict[outputParamName])

Definition at line 181 of file TrfUtils.py.

◆ outputfile

python.TrfUtils.JobRunnerTransform.outputfile = getFileName(self.argdict[outputParamName])

Definition at line 179 of file TrfUtils.py.

◆ outputList

list python.TrfUtils.JobRunnerTransform.outputList = [ ]

Definition at line 136 of file TrfUtils.py.

◆ outputParamName

python.TrfUtils.JobRunnerTransform.outputParamName = outputParamName

Definition at line 126 of file TrfUtils.py.

◆ prodDir

str python.TrfUtils.JobRunnerTransform.prodDir = '.'

Definition at line 138 of file TrfUtils.py.

◆ prodTaskDb

python.TrfUtils.JobRunnerTransform.prodTaskDb = ''

Definition at line 139 of file TrfUtils.py.

◆ reportName

python.TrfUtils.JobRunnerTransform.reportName = 'jobReport.json'

Definition at line 137 of file TrfUtils.py.

◆ runner

python.TrfUtils.JobRunnerTransform.runner = None

Definition at line 133 of file TrfUtils.py.

◆ taskname

python.TrfUtils.JobRunnerTransform.taskname = '.'.join(splitDSName[-2:])

Definition at line 192 of file TrfUtils.py.

◆ templateOutputName

python.TrfUtils.JobRunnerTransform.templateOutputName = templateOutputName

Definition at line 134 of file TrfUtils.py.


The documentation for this class was generated from the following file: