ATLAS Offline Software
Loading...
Searching...
No Matches
TrfUtils.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4
5"""
6Utilities for writing job transforms for use at T0 and at the CAF Task Management System.
7"""
8__author__ = 'Juerg Beringer'
9__version__ = 'TrfUtils.py atlas/athena'
10
11
12import os, sys, pprint
13import json, yaml
14from InDetBeamSpotExample import JobRunner
15from InDetBeamSpotExample.TaskManager import TaskManager
16from optparse import OptionParser
17import subprocess
18
19def readJSON(fname):
20 """Read a JSON file and return its data."""
21 f = open(fname,'r')
22 """JSON converts strings to unicode, so we use YAML instead."""
23 data = yaml.load(f,Loader=yaml.FullLoader)
24 f.close()
25 return data
26
27def writeJSON(fname, data):
28 """Serialize data and write to fname."""
29 f = open(fname,'w')
30 print (json.dumps(data), file=f)
31 f.close()
32
33def getDataSetName(name, sep='#'):
34 """Extract dataset name from a qualified file name."""
35 if isinstance(name, str) :
36 """Old format of ds/file """
37 if sep in name:
38 return name.split(sep)[0]
39 else:
40 return None
41 elif isinstance(name, dict) :
42 """New format of ds/file """
43 return name['dsn']
44
45def getFileName(name, sep='#'):
46 """Extract file name from a qualified file name."""
47 if isinstance(name, str) :
48 """Old format of ds/file """
49 if sep in name:
50 return name.split(sep)[1]
51 else:
52 return name
53 elif isinstance(name, dict) :
54 """New format of ds/file """
55 return name['lfn']
56
57
58def parseQualifiedFileNames(inputList, sep='#'):
59 """Parse a list of qualified file names (file names that are prepended with
60 their dataset name). Returns a tuple consisting of the dataset name and the
61 list of unqualified file names."""
62 if not inputList:
63 return (None, [])
64 dataset = getDataSetName(inputList[0],sep)
65 files = []
66 for f in inputList:
67 files.append(getFileName(f,sep))
68 return (dataset,files)
69
70
71def getGuid(name):
72 """Get GUID of file name from local Pool file catalog (from Armin Nairz, as used at Tier-0)."""
73 pfc='PoolFileCatalog.xml'
74 if not os.path.isfile(pfc) :
75 try :
76 (s,o) = subprocess.getstatusoutput('uuidgen')
77 guid = o.strip()
78 except Exception:
79 guid = 'UUIDGENERROR'
80 else :
81 try :
82 cmd = 'grep -B2 %s %s' % (name,pfc)
83 cmd += ' | grep "File ID" | sed -e "s/ <File ID=\\\"//g" | sed -e "s/\\\">//g"'
84 (s,o) = subprocess.getstatusoutput(cmd)
85 guid = o.strip()
86 # there might be a PFC from earlier processing steps
87 if guid == '' :
88 (s,o) = subprocess.getstatusoutput('uuidgen')
89 guid = o.strip()
90 except Exception:
91 guid = 'PFCPARSINGERROR'
92 return guid
93
94
95def getFileDescription(fileName,dsname=''):
96 """Return a dictionary with the standard file description required by T0 conventions.
97 If no explicit dataset name is provided in dsname, the dataset name is
98 assumed to be prepended to the fileName (first part, before '#')."""
99 if not dsname:
100 dsname = getDataSetName(fileName)
101 d = {
102 'dataset': dsname,
103 'subFiles':[
104 {
105 'name': getFileName(fileName),
106 'file_guid': getGuid(fileName),
107 'file_size': 0,
108 'nentries': 0,
109 'checkSum': 0,
110
111 }
112 ]
113 }
114 return d
115
116
118 """Job transform for running a JobRunner job at T0 or at the CAF Task Management
119 System. Note that this class may abort execution by calling exit() in case of errors.
120 Except in case of syntactical errors caught by OptionParser, a jobReport will always
121 be produced."""
122
123 def __init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='',
124 mandatoryArgs = [], optionalArgs = []):
125 self.inputParamName = inputParamName
126 self.outputParamName = outputParamName
127 self.mandatoryArgs = mandatoryArgs
128 if inputParamName not in mandatoryArgs:
129 mandatoryArgs.append(inputParamName)
130 if outputParamName not in mandatoryArgs:
131 mandatoryArgs.append(outputParamName)
132 self.optionalArgs = optionalArgs
133 self.runner = None
134 self.templateOutputName = templateOutputName
135 self.jobDirOutputName = jobDirOutputName
136 self.outputList = [ ] # List of qualified output files (ie file names including dataset name)
137 self.reportName = 'jobReport.json'
138 self.prodDir = '.'
139 self.prodTaskDb = ''
140
141 # Process command line args and extract argdict
142 parser = OptionParser(usage="%prog --argJSON=<JSON file>")
143 parser.add_option('-a', '--argJSON', dest='argJSON', help='Local file with JSON-serialized dictionary of key/value pairs')
144 (options,args) = parser.parse_args()
145 if len(args) != 0:
146 self.report('WRONGARGSNUMBER_ERROR','Wrong number of command line arguments')
147 parser.error('wrong number of command line arguments')
148 if not options.argJSON:
149 self.report('NOARGDICT_ERROR','Must use --argJSON to specify argdict.json file')
150 parser.error('option --argJSON is mandatory')
151 try:
152 self.argdictFileName = options.argJSON
153 self.argdict = readJSON(options.argJSON)
154 except Exception as e:
155 self.report('ARGDICTNOTREADABLE_ERROR','File %s with JSON-serialized argdict cannot be read' % options.argJSON)
156 print ('ERROR: file %s with JSON-serialized argdict cannot be read' % options.argJSON)
157 print ('DEBUG: Exception =',e)
158 sys.exit(1)
159
160 # Print argdict
161 print ('\nInput argdict (%s):\n' % options.argJSON)
162 print (pprint.pformat(self.argdict))
163 print ('\n')
164
165 # Check for all mandatory parameters
166 missingArgs = [ x for x in mandatoryArgs if x not in self.argdict ]
167 if missingArgs:
168 self.report('MISSINGPARAM_ERROR','Mandatory parameter(s) missing from argdict: '+str(missingArgs))
169 print ('ERROR: mandatory parameter(s) missing from argdict:', missingArgs)
170 sys.exit(1)
171
172 # Extract input and output dataset and file names
173 # NOTE: inputs come from a list, output is a single file (but there might be others)
174 self.inputds, self.inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])
175 if not self.inputfiles:
176 self.report('NOINPUTFILE_ERROR','No input file specified (only dataset name?)')
177 print ('ERROR: no input file specified')
178 sys.exit(1)
179 self.outputfile = getFileName(self.argdict[outputParamName])
180 #self.outputList.append(self.argdict[outputParamName])
181 self.outputds = getDataSetName(self.argdict[outputParamName])
182 if not self.outputds:
183 self.report('NODATASET_ERROR','No dataset given in parameter '+outputParamName)
184 print ('ERROR: No dataset given in parameter',outputParamName)
185 sys.exit(1)
186 splitDSName = self.outputds.split('.')
187 if len(splitDSName)<5:
188 self.report('DATASETNAME_ERROR',"Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
189 print ("ERROR: Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
190 sys.exit(1)
191 self.dataset = '.'.join(splitDSName[:-3])
192 self.taskname = '.'.join(splitDSName[-2:])
193 self.jobname = os.path.basename(self.outputfile)
194 if '_attempt' in self.argdict:
195 self.jobname = self.jobname + '.v%s' % self.argdict['_attempt']
196
197 def setProdDir(self,dir):
198 if os.access(dir,os.W_OK):
199 self.prodDir = dir
200 else:
201 # Continue anyway or abort?
202 print ('ERROR: No write access to production directory',dir,'- will use current working directory instead:', os.getcwd())
203 self.prodDir = os.getcwd()
204 sys.exit(1)
205
206 def setProdTaskDatabase(self,taskdb):
207 self.prodTaskDb = taskdb
208
209 def getJobRunner(self,**jobRunnerArgs):
210 if self.runner:
211 print ('WARNING: Overwriting already configured JobRunner')
212 self.runner = JobRunner.JobRunner(jobdir=self.prodDir+'/'+self.dataset+'/'+self.taskname+'/'+self.jobname,
213 jobname=self.jobname,
214 inputds=self.inputds,
215 inputfiles=self.inputfiles,
216 outputds=self.outputds,
217 filesperjob=len(self.inputfiles),
218 setuprelease=False,
219 addinputtopoolcatalog=False,
220 returnstatuscode=True)
221 self.runner.appendParam('cmdjobpreprocessing',
222 'cp %s %s/%s.argdict.json' % (self.argdictFileName, self.runner.getParam('jobdir'),self.jobname))
223 self.runner.setParam(self.templateOutputName,self.outputfile)
224 if self.jobDirOutputName:
225 self.runner.appendParam('cmdjobpostprocessing',
226 'cp %s %s/%s-%s' % (self.outputfile, self.runner.getParam('jobdir'),self.jobname,self.jobDirOutputName))
227 for k,v in jobRunnerArgs.items():
228 self.runner.setParam(k,v)
229 for k,v in self.argdict.items():
230 if k in self.mandatoryArgs: continue
231 if k in self.optionalArgs: continue
232 self.runner.setParam(k,v)
233 return self.runner
234
235 def addOutput(self, paramName, templateName, jobDirName=''):
236 """Add an additional output file to the output dataset. If jobDirName is set, the
237 output file will also be copied under that name to the job directory."""
238 #self.outputList.append(self.argdict[paramName])
239 f = getFileName(self.argdict[paramName])
240 self.runner.setParam(templateName,f)
241 if jobDirName:
242 self.runner.appendParam('cmdjobpostprocessing',
243 'cp %s %s/%s-%s' % (f,self.runner.getParam('jobdir'),self.jobname,jobDirName))
244 def showParams(self):
245 print ('JobRunner parameters:\n')
246 self.runner.showParams()
247
248 def configure(self):
249 self.runner.configure()
250 # If JobRunner configuration was successful, move any earlier jobs
251 # out of the way if _attempt is specified. This should guarantee
252 # that the task directory contains only valid jobs
253 if '_attempt' in self.argdict:
254 currentAttempt = int(self.argdict['_attempt'])
255 for i in range(-1,currentAttempt):
256 if i==-1:
257 d = '%s/%s/%s/%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile))
258 else:
259 d = '%s/%s/%s/%s.v%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile),i)
260 if os.path.exists(d):
261 retriedJobDir = '%s/%s/%s/RETRIED_JOBS' % (self.prodDir,self.dataset,self.taskname)
262 print ('\nMoving previous job directory %s to %s' % (d,retriedJobDir))
263 os.system('mkdir -p %s' % (retriedJobDir))
264 os.system('mv -f %s %s' % (d,retriedJobDir))
265
266 def addTaskToDatabase(self,comment=''):
267 if self.prodTaskDb:
268 try:
269 with TaskManager(self.prodTaskDb) as taskman:
270 taskman.addTask(self.dataset,
271 self.taskname,
272 self.runner.getParam('joboptionpath'),
273 self.runner.getParam('release'),
274 self.runner.getNJobs(),
275 self.runner.getParam('taskpostprocsteps'),
276 comment=comment)
277 except Exception as e:
278 print ('ERROR: Unable to add task to task manager database '+self.prodTaskDb)
279 print ('DEBUG: Exception =',e)
280 else:
281 print ('WARNING: No task manager database configured')
282
283 def run(self):
284 self.runner.run()
285 #print (self.runner.jobStatus)
286
287
288 def go(self,commentForTaskDb=''):
289 """Show parameters, configure job, update task database, run job and produce report.
290 This method will ensure that a job report is always produced, independently of any errors."""
291 try:
292 self.showParams()
293 self.configure()
294 except Exception as e:
295 self.report('JOBRUNNER_CONFIGURE_ERROR','Unable to configure JobRunner job - perhaps same job was already configured / run before?')
296 print ("ERROR: Unable to configure JobRunner job - perhaps same job was already configured / run before?")
297 print ('DEBUG: Exception =',e)
298 else:
299 try:
300 self.addTaskToDatabase(commentForTaskDb)
301 self.run()
302 finally:
303 self.report()
304
305
306 def report(self,errAcronym='',moreText=''):
307 if errAcronym:
308 jobStatus = 999
309 jobStatusAcronym = errAcronym
310 else:
311 try:
312 jobStatus = self.runner.jobStatus[0] # Assume we always run single jobs
313 jobStatusAcronym = 'OK' if jobStatus==0 else 'ATHENA_ERROR'
314 except Exception:
315 jobStatus = 999
316 jobStatusAcronym = 'NOJOBSTATUS_ERROR'
317 moreText = "Jobrunner terminated abnormally and w/o a job status; athena job may or may not have run"
318
319 jobStatusAcronym = jobStatusAcronym[:128] # 128 char limit in T0 DB
320 report = {'exitCode': jobStatus,
321 'exitAcronym': jobStatusAcronym,
322 'files': { 'output':[] }
323 }
324 if moreText:
325 report['exitMsg'] = moreText
326
327 # If there was no error, store outputs (request from Armin to not give any outputs for failed jobs).
328 # Must also check that output file indeed exists.
329 if jobStatus==0:
330 for f in self.outputList:
331 if os.path.exists(getFileName(f)):
332 report['files']['output'].append(getFileDescription(f))
333
334 # Write jobReport file
335 writeJSON(self.reportName,report)
336
337 # Copy jobReport file to job directory - note we do this only if there was
338 # no error, otherwise we might overwrite an older report from an OK job
339 if jobStatus==0:
340 try:
341 os.system('cp %s %s/%s.%s' % (self.reportName,self.runner.getParam('jobdir'),self.jobname,self.reportName) )
342 except Exception as e:
343 print ('WARNING: Copying of job report file (%s) to job directory failed' % self.reportName)
344 print ('DEBUG: Exception =',e)
345
346 # Nicely print job report to stdout
347 print ('\n\nJob report (jobReport.json):\n')
348 print (pprint.pformat(report))
349 print ('\n')
go(self, commentForTaskDb='')
Definition TrfUtils.py:288
getJobRunner(self, **jobRunnerArgs)
Definition TrfUtils.py:209
addTaskToDatabase(self, comment='')
Definition TrfUtils.py:266
addOutput(self, paramName, templateName, jobDirName='')
Definition TrfUtils.py:235
__init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='', mandatoryArgs=[], optionalArgs=[])
Definition TrfUtils.py:124
report(self, errAcronym='', moreText='')
Definition TrfUtils.py:306
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
getDataSetName(name, sep='#')
Definition TrfUtils.py:33
readJSON(fname)
Definition TrfUtils.py:19
writeJSON(fname, data)
Definition TrfUtils.py:27
getFileName(name, sep='#')
Definition TrfUtils.py:45
getFileDescription(fileName, dsname='')
Definition TrfUtils.py:95
parseQualifiedFileNames(inputList, sep='#')
Definition TrfUtils.py:58
Definition run.py:1