6 Utilities for writing job transforms for use at T0 and at the CAF Task Management System.
8 __author__ =
'Juerg Beringer'
9 __version__ =
'TrfUtils.py atlas/athena'
12 import os, sys, pprint
14 from InDetBeamSpotExample
import JobRunner
16 from optparse
import OptionParser
20 """Read a JSON file and return its data."""
22 """JSON converts strings to unicode, so we use YAML instead."""
23 data = yaml.load(f,Loader=yaml.FullLoader)
28 """Serialize data and write to fname."""
30 print (json.dumps(data), file=f)
34 """Extract dataset name from a qualified file name."""
35 if isinstance(name, str) :
36 """Old format of ds/file """
38 return name.split(sep)[0]
41 elif isinstance(name, dict) :
42 """New format of ds/file """
46 """Extract file name from a qualified file name."""
47 if isinstance(name, str) :
48 """Old format of ds/file """
50 return name.split(sep)[1]
53 elif isinstance(name, dict) :
54 """New format of ds/file """
59 """Parse a list of qualified file names (file names that are prepended with
60 their dataset name). Returns a tuple consisting of the dataset name and the
61 list of unqualified file names."""
68 return (dataset,files)
72 """Get GUID of file name from local Pool file catalog (from Armin Nairz, as used at Tier-0)."""
73 pfc=
'PoolFileCatalog.xml'
74 if not os.path.isfile(pfc) :
76 (s,o) = subprocess.getstatusoutput(
'uuidgen')
82 cmd =
'grep -B2 %s %s' % (name,pfc)
83 cmd +=
' | grep "File ID" | sed -e "s/ <File ID=\\\"//g" | sed -e "s/\\\">//g"'
84 (s,o) = subprocess.getstatusoutput(cmd)
88 (s,o) = subprocess.getstatusoutput(
'uuidgen')
91 guid =
'PFCPARSINGERROR'
96 """Return a dictionary with the standard file description required by T0 conventions.
97 If no explicit dataset name is provided in dsname, the dataset name is
98 assumed to be prepended to the fileName (first part, before '#')."""
106 'file_guid':
getGuid(fileName),
118 """Job transform for running a JobRunner job at T0 or at the CAF Task Management
119 System. Note that this class may abort execution by calling exit() in case of errors.
120 Except in case of syntactical errors caught by OptionParser, a jobReport will always
123 def __init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='',
124 mandatoryArgs = [], optionalArgs = []):
128 if inputParamName
not in mandatoryArgs:
129 mandatoryArgs.append(inputParamName)
130 if outputParamName
not in mandatoryArgs:
131 mandatoryArgs.append(outputParamName)
142 parser = OptionParser(usage=
"%prog --argJSON=<JSON file>")
143 parser.add_option(
'-a',
'--argJSON', dest=
'argJSON', help=
'Local file with JSON-serialized dictionary of key/value pairs')
144 (options,args) = parser.parse_args()
146 self.
report(
'WRONGARGSNUMBER_ERROR',
'Wrong number of command line arguments')
147 parser.error(
'wrong number of command line arguments')
148 if not options.argJSON:
149 self.
report(
'NOARGDICT_ERROR',
'Must use --argJSON to specify argdict.json file')
150 parser.error(
'option --argJSON is mandatory')
154 except Exception
as e:
155 self.
report(
'ARGDICTNOTREADABLE_ERROR',
'File %s with JSON-serialized argdict cannot be read' % options.argJSON)
156 print (
'ERROR: file %s with JSON-serialized argdict cannot be read' % options.argJSON)
157 print (
'DEBUG: Exception =',e)
161 print (
'\nInput argdict (%s):\n' % options.argJSON)
162 print (pprint.pformat(self.
argdict))
166 missingArgs = [ x
for x
in mandatoryArgs
if x
not in self.
argdict ]
168 self.
report(
'MISSINGPARAM_ERROR',
'Mandatory parameter(s) missing from argdict: '+
str(missingArgs))
169 print (
'ERROR: mandatory parameter(s) missing from argdict:', missingArgs)
176 self.
report(
'NOINPUTFILE_ERROR',
'No input file specified (only dataset name?)')
177 print (
'ERROR: no input file specified')
183 self.
report(
'NODATASET_ERROR',
'No dataset given in parameter '+outputParamName)
184 print (
'ERROR: No dataset given in parameter',outputParamName)
187 if len(splitDSName)<5:
188 self.
report(
'DATASETNAME_ERROR',
"Output dataset name %s doesn't conform to standard naming convention" % self.
outputds)
189 print (
"ERROR: Output dataset name %s doesn't conform to standard naming convention" % self.
outputds)
198 if os.access(dir,os.W_OK):
202 print (
'ERROR: No write access to production directory',dir,
'- will use current working directory instead:', os.getcwd())
211 print (
'WARNING: Overwriting already configured JobRunner')
214 inputds=self.inputds,
219 addinputtopoolcatalog=
False,
220 returnstatuscode=
True)
221 self.
runner.appendParam(
'cmdjobpreprocessing',
225 self.
runner.appendParam(
'cmdjobpostprocessing',
227 for k,v
in jobRunnerArgs.items():
235 def addOutput(self, paramName, templateName, jobDirName=''):
236 """Add an additional output file to the output dataset. If jobDirName is set, the
237 output file will also be copied under that name to the job directory."""
240 self.
runner.setParam(templateName,f)
242 self.
runner.appendParam(
'cmdjobpostprocessing',
243 'cp %s %s/%s-%s' % (f,self.
runner.getParam(
'jobdir'),self.
jobname,jobDirName))
245 print (
'JobRunner parameters:\n')
254 currentAttempt =
int(self.
argdict[
'_attempt'])
255 for i
in range(-1,currentAttempt):
260 if os.path.exists(d):
262 print (
'\nMoving previous job directory %s to %s' % (d,retriedJobDir))
263 os.system(
'mkdir -p %s' % (retriedJobDir))
264 os.system(
'mv -f %s %s' % (d,retriedJobDir))
272 self.
runner.getParam(
'joboptionpath'),
273 self.
runner.getParam(
'release'),
275 self.
runner.getParam(
'taskpostprocsteps'),
277 except Exception
as e:
278 print (
'ERROR: Unable to add task to task manager database '+self.
prodTaskDb)
279 print (
'DEBUG: Exception =',e)
281 print (
'WARNING: No task manager database configured')
288 def go(self,commentForTaskDb=''):
289 """Show parameters, configure job, update task database, run job and produce report.
290 This method will ensure that a job report is always produced, independently of any errors."""
294 except Exception
as e:
295 self.
report(
'JOBRUNNER_CONFIGURE_ERROR',
'Unable to configure JobRunner job - perhaps same job was already configured / run before?')
296 print (
"ERROR: Unable to configure JobRunner job - perhaps same job was already configured / run before?")
297 print (
'DEBUG: Exception =',e)
306 def report(self,errAcronym='',moreText=''):
309 jobStatusAcronym = errAcronym
312 jobStatus = self.
runner.jobStatus[0]
313 jobStatusAcronym =
'OK' if jobStatus==0
else 'ATHENA_ERROR'
316 jobStatusAcronym =
'NOJOBSTATUS_ERROR'
317 moreText =
"Jobrunner terminated abnormally and w/o a job status; athena job may or may not have run"
319 jobStatusAcronym = jobStatusAcronym[:128]
320 report = {
'exitCode': jobStatus,
321 'exitAcronym': jobStatusAcronym,
322 'files': {
'output':[] }
325 report[
'exitMsg'] = moreText
342 except Exception
as e:
343 print (
'WARNING: Copying of job report file (%s) to job directory failed' % self.
reportName)
344 print (
'DEBUG: Exception =',e)
347 print (
'\n\nJob report (jobReport.json):\n')
348 print (pprint.pformat(report))