ATLAS Offline Software
TrfUtils.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 
5 """
6 Utilities for writing job transforms for use at T0 and at the CAF Task Management System.
7 """
8 __author__ = 'Juerg Beringer'
9 __version__ = 'TrfUtils.py atlas/athena'
10 
11 
12 import os, sys, pprint
13 import json, yaml
14 from InDetBeamSpotExample import JobRunner
15 from InDetBeamSpotExample.TaskManager import TaskManager
16 from optparse import OptionParser
17 import subprocess
18 
19 def readJSON(fname):
20  """Read a JSON file and return its data."""
21  f = open(fname,'r')
22  """JSON converts strings to unicode, so we use YAML instead."""
23  data = yaml.load(f,Loader=yaml.FullLoader)
24  f.close()
25  return data
26 
27 def writeJSON(fname, data):
28  """Serialize data and write to fname."""
29  f = open(fname,'w')
30  print (json.dumps(data), file=f)
31  f.close()
32 
33 def getDataSetName(name, sep='#'):
34  """Extract dataset name from a qualified file name."""
35  if isinstance(name, str) :
36  """Old format of ds/file """
37  if sep in name:
38  return name.split(sep)[0]
39  else:
40  return None
41  elif isinstance(name, dict) :
42  """New format of ds/file """
43  return name['dsn']
44 
45 def getFileName(name, sep='#'):
46  """Extract file name from a qualified file name."""
47  if isinstance(name, str) :
48  """Old format of ds/file """
49  if sep in name:
50  return name.split(sep)[1]
51  else:
52  return name
53  elif isinstance(name, dict) :
54  """New format of ds/file """
55  return name['lfn']
56 
57 
58 def parseQualifiedFileNames(inputList, sep='#'):
59  """Parse a list of qualified file names (file names that are prepended with
60  their dataset name). Returns a tuple consisting of the dataset name and the
61  list of unqualified file names."""
62  if not inputList:
63  return (None, [])
64  dataset = getDataSetName(inputList[0],sep)
65  files = []
66  for f in inputList:
67  files.append(getFileName(f,sep))
68  return (dataset,files)
69 
70 
71 def getGuid(name):
72  """Get GUID of file name from local Pool file catalog (from Armin Nairz, as used at Tier-0)."""
73  pfc='PoolFileCatalog.xml'
74  if not os.path.isfile(pfc) :
75  try :
76  (s,o) = subprocess.getstatusoutput('uuidgen')
77  guid = o.strip()
78  except Exception:
79  guid = 'UUIDGENERROR'
80  else :
81  try :
82  cmd = 'grep -B2 %s %s' % (name,pfc)
83  cmd += ' | grep "File ID" | sed -e "s/ <File ID=\\\"//g" | sed -e "s/\\\">//g"'
84  (s,o) = subprocess.getstatusoutput(cmd)
85  guid = o.strip()
86  # there might be a PFC from earlier processing steps
87  if guid == '' :
88  (s,o) = subprocess.getstatusoutput('uuidgen')
89  guid = o.strip()
90  except Exception:
91  guid = 'PFCPARSINGERROR'
92  return guid
93 
94 
95 def getFileDescription(fileName,dsname=''):
96  """Return a dictionary with the standard file description required by T0 conventions.
97  If no explicit dataset name is provided in dsname, the dataset name is
98  assumed to be prepended to the fileName (first part, before '#')."""
99  if not dsname:
100  dsname = getDataSetName(fileName)
101  d = {
102  'dataset': dsname,
103  'subFiles':[
104  {
105  'name': getFileName(fileName),
106  'file_guid': getGuid(fileName),
107  'file_size': 0,
108  'nentries': 0,
109  'checkSum': 0,
110 
111  }
112  ]
113  }
114  return d
115 
116 
118  """Job transform for running a JobRunner job at T0 or at the CAF Task Management
119  System. Note that this class may abort execution by calling exit() in case of errors.
120  Except in case of syntactical errors caught by OptionParser, a jobReport will always
121  be produced."""
122 
123  def __init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='',
124  mandatoryArgs = [], optionalArgs = []):
125  self.inputParamName = inputParamName
126  self.outputParamName = outputParamName
127  self.mandatoryArgs = mandatoryArgs
128  if inputParamName not in mandatoryArgs:
129  mandatoryArgs.append(inputParamName)
130  if outputParamName not in mandatoryArgs:
131  mandatoryArgs.append(outputParamName)
132  self.optionalArgs = optionalArgs
133  self.runner = None
134  self.templateOutputName = templateOutputName
135  self.jobDirOutputName = jobDirOutputName
136  self.outputList = [ ] # List of qualified output files (ie file names including dataset name)
137  self.reportName = 'jobReport.json'
138  self.prodDir = '.'
139  self.prodTaskDb = ''
140 
141  # Process command line args and extract argdict
142  parser = OptionParser(usage="%prog --argJSON=<JSON file>")
143  parser.add_option('-a', '--argJSON', dest='argJSON', help='Local file with JSON-serialized dictionary of key/value pairs')
144  (options,args) = parser.parse_args()
145  if len(args) != 0:
146  self.report('WRONGARGSNUMBER_ERROR','Wrong number of command line arguments')
147  parser.error('wrong number of command line arguments')
148  if not options.argJSON:
149  self.report('NOARGDICT_ERROR','Must use --argJSON to specify argdict.json file')
150  parser.error('option --argJSON is mandatory')
151  try:
152  self.argdictFileName = options.argJSON
153  self.argdict = readJSON(options.argJSON)
154  except Exception as e:
155  self.report('ARGDICTNOTREADABLE_ERROR','File %s with JSON-serialized argdict cannot be read' % options.argJSON)
156  print ('ERROR: file %s with JSON-serialized argdict cannot be read' % options.argJSON)
157  print ('DEBUG: Exception =',e)
158  sys.exit(1)
159 
160  # Print argdict
161  print ('\nInput argdict (%s):\n' % options.argJSON)
162  print (pprint.pformat(self.argdict))
163  print ('\n')
164 
165  # Check for all mandatory parameters
166  missingArgs = [ x for x in mandatoryArgs if x not in self.argdict ]
167  if missingArgs:
168  self.report('MISSINGPARAM_ERROR','Mandatory parameter(s) missing from argdict: '+str(missingArgs))
169  print ('ERROR: mandatory parameter(s) missing from argdict:', missingArgs)
170  sys.exit(1)
171 
172  # Extract input and output dataset and file names
173  # NOTE: inputs come from a list, output is a single file (but there might be others)
174  self.inputds, self.inputfiles = parseQualifiedFileNames(self.argdict[inputParamName])
175  if not self.inputfiles:
176  self.report('NOINPUTFILE_ERROR','No input file specified (only dataset name?)')
177  print ('ERROR: no input file specified')
178  sys.exit(1)
179  self.outputfile = getFileName(self.argdict[outputParamName])
180  #self.outputList.append(self.argdict[outputParamName])
181  self.outputds = getDataSetName(self.argdict[outputParamName])
182  if not self.outputds:
183  self.report('NODATASET_ERROR','No dataset given in parameter '+outputParamName)
184  print ('ERROR: No dataset given in parameter',outputParamName)
185  sys.exit(1)
186  splitDSName = self.outputds.split('.')
187  if len(splitDSName)<5:
188  self.report('DATASETNAME_ERROR',"Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
189  print ("ERROR: Output dataset name %s doesn't conform to standard naming convention" % self.outputds)
190  sys.exit(1)
191  self.dataset = '.'.join(splitDSName[:-3])
192  self.taskname = '.'.join(splitDSName[-2:])
193  self.jobname = os.path.basename(self.outputfile)
194  if '_attempt' in self.argdict:
195  self.jobname = self.jobname + '.v%s' % self.argdict['_attempt']
196 
197  def setProdDir(self,dir):
198  if os.access(dir,os.W_OK):
199  self.prodDir = dir
200  else:
201  # Continue anyway or abort?
202  print ('ERROR: No write access to production directory',dir,'- will use current working directory instead:', os.getcwd())
203  self.prodDir = os.getcwd()
204  sys.exit(1)
205 
206  def setProdTaskDatabase(self,taskdb):
207  self.prodTaskDb = taskdb
208 
209  def getJobRunner(self,**jobRunnerArgs):
210  if self.runner:
211  print ('WARNING: Overwriting already configured JobRunner')
212  self.runner = JobRunner.JobRunner(jobdir=self.prodDir+'/'+self.dataset+'/'+self.taskname+'/'+self.jobname,
213  jobname=self.jobname,
214  inputds=self.inputds,
215  inputfiles=self.inputfiles,
216  outputds=self.outputds,
217  filesperjob=len(self.inputfiles),
218  setuprelease=False,
219  addinputtopoolcatalog=False,
220  returnstatuscode=True)
221  self.runner.appendParam('cmdjobpreprocessing',
222  'cp %s %s/%s.argdict.json' % (self.argdictFileName, self.runner.getParam('jobdir'),self.jobname))
223  self.runner.setParam(self.templateOutputName,self.outputfile)
224  if self.jobDirOutputName:
225  self.runner.appendParam('cmdjobpostprocessing',
226  'cp %s %s/%s-%s' % (self.outputfile, self.runner.getParam('jobdir'),self.jobname,self.jobDirOutputName))
227  for k,v in jobRunnerArgs.items():
228  self.runner.setParam(k,v)
229  for k,v in self.argdict.items():
230  if k in self.mandatoryArgs: continue
231  if k in self.optionalArgs: continue
232  self.runner.setParam(k,v)
233  return self.runner
234 
235  def addOutput(self, paramName, templateName, jobDirName=''):
236  """Add an additional output file to the output dataset. If jobDirName is set, the
237  output file will also be copied under that name to the job directory."""
238  #self.outputList.append(self.argdict[paramName])
239  f = getFileName(self.argdict[paramName])
240  self.runner.setParam(templateName,f)
241  if jobDirName:
242  self.runner.appendParam('cmdjobpostprocessing',
243  'cp %s %s/%s-%s' % (f,self.runner.getParam('jobdir'),self.jobname,jobDirName))
244  def showParams(self):
245  print ('JobRunner parameters:\n')
246  self.runner.showParams()
247 
248  def configure(self):
249  self.runner.configure()
250  # If JobRunner configuration was successful, move any earlier jobs
251  # out of the way if _attempt is specified. This should guarantee
252  # that the task directory contains only valid jobs
253  if '_attempt' in self.argdict:
254  currentAttempt = int(self.argdict['_attempt'])
255  for i in range(-1,currentAttempt):
256  if i==-1:
257  d = '%s/%s/%s/%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile))
258  else:
259  d = '%s/%s/%s/%s.v%s' % (self.prodDir,self.dataset,self.taskname,os.path.basename(self.outputfile),i)
260  if os.path.exists(d):
261  retriedJobDir = '%s/%s/%s/RETRIED_JOBS' % (self.prodDir,self.dataset,self.taskname)
262  print ('\nMoving previous job directory %s to %s' % (d,retriedJobDir))
263  os.system('mkdir -p %s' % (retriedJobDir))
264  os.system('mv -f %s %s' % (d,retriedJobDir))
265 
266  def addTaskToDatabase(self,comment=''):
267  if self.prodTaskDb:
268  try:
269  with TaskManager(self.prodTaskDb) as taskman:
270  taskman.addTask(self.dataset,
271  self.taskname,
272  self.runner.getParam('joboptionpath'),
273  self.runner.getParam('release'),
274  self.runner.getNJobs(),
275  self.runner.getParam('taskpostprocsteps'),
276  comment=comment)
277  except Exception as e:
278  print ('ERROR: Unable to add task to task manager database '+self.prodTaskDb)
279  print ('DEBUG: Exception =',e)
280  else:
281  print ('WARNING: No task manager database configured')
282 
283  def run(self):
284  self.runner.run()
285  #print (self.runner.jobStatus)
286 
287 
288  def go(self,commentForTaskDb=''):
289  """Show parameters, configure job, update task database, run job and produce report.
290  This method will ensure that a job report is always produced, independently of any errors."""
291  try:
292  self.showParams()
293  self.configure()
294  except Exception as e:
295  self.report('JOBRUNNER_CONFIGURE_ERROR','Unable to configure JobRunner job - perhaps same job was already configured / run before?')
296  print ("ERROR: Unable to configure JobRunner job - perhaps same job was already configured / run before?")
297  print ('DEBUG: Exception =',e)
298  else:
299  try:
300  self.addTaskToDatabase(commentForTaskDb)
301  self.run()
302  finally:
303  self.report()
304 
305 
306  def report(self,errAcronym='',moreText=''):
307  if errAcronym:
308  jobStatus = 999
309  jobStatusAcronym = errAcronym
310  else:
311  try:
312  jobStatus = self.runner.jobStatus[0] # Assume we always run single jobs
313  jobStatusAcronym = 'OK' if jobStatus==0 else 'ATHENA_ERROR'
314  except Exception:
315  jobStatus = 999
316  jobStatusAcronym = 'NOJOBSTATUS_ERROR'
317  moreText = "Jobrunner terminated abnormally and w/o a job status; athena job may or may not have run"
318 
319  jobStatusAcronym = jobStatusAcronym[:128] # 128 char limit in T0 DB
320  report = {'exitCode': jobStatus,
321  'exitAcronym': jobStatusAcronym,
322  'files': { 'output':[] }
323  }
324  if moreText:
325  report['exitMsg'] = moreText
326 
327  # If there was no error, store outputs (request from Armin to not give any outputs for failed jobs).
328  # Must also check that output file indeed exists.
329  if jobStatus==0:
330  for f in self.outputList:
331  if os.path.exists(getFileName(f)):
332  report['files']['output'].append(getFileDescription(f))
333 
334  # Write jobReport file
335  writeJSON(self.reportName,report)
336 
337  # Copy jobReport file to job directory - note we do this only if there was
338  # no error, otherwise we might overwrite an older report from an OK job
339  if jobStatus==0:
340  try:
341  os.system('cp %s %s/%s.%s' % (self.reportName,self.runner.getParam('jobdir'),self.jobname,self.reportName) )
342  except Exception as e:
343  print ('WARNING: Copying of job report file (%s) to job directory failed' % self.reportName)
344  print ('DEBUG: Exception =',e)
345 
346  # Nicely print job report to stdout
347  print ('\n\nJob report (jobReport.json):\n')
348  print (pprint.pformat(report))
349  print ('\n')
python.TrfUtils.JobRunnerTransform.outputList
outputList
Definition: TrfUtils.py:135
python.TrfUtils.JobRunnerTransform.templateOutputName
templateOutputName
Definition: TrfUtils.py:133
python.TrfUtils.JobRunnerTransform.reportName
reportName
Definition: TrfUtils.py:136
python.TrfUtils.JobRunnerTransform.addTaskToDatabase
def addTaskToDatabase(self, comment='')
Definition: TrfUtils.py:266
python.TrfUtils.getFileDescription
def getFileDescription(fileName, dsname='')
Definition: TrfUtils.py:95
python.TrfUtils.JobRunnerTransform.argdictFileName
argdictFileName
Definition: TrfUtils.py:151
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.TrfUtils.JobRunnerTransform.dataset
dataset
Definition: TrfUtils.py:190
python.TrfUtils.JobRunnerTransform.setProdTaskDatabase
def setProdTaskDatabase(self, taskdb)
Definition: TrfUtils.py:206
python.TrfUtils.JobRunnerTransform.report
def report(self, errAcronym='', moreText='')
Definition: TrfUtils.py:306
python.TrfUtils.JobRunnerTransform.run
def run(self)
Definition: TrfUtils.py:283
python.TrfUtils.writeJSON
def writeJSON(fname, data)
Definition: TrfUtils.py:27
python.TrfUtils.JobRunnerTransform.jobDirOutputName
jobDirOutputName
Definition: TrfUtils.py:134
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.TrfUtils.JobRunnerTransform.inputParamName
inputParamName
Definition: TrfUtils.py:124
python.TrfUtils.JobRunnerTransform.showParams
def showParams(self)
Definition: TrfUtils.py:244
python.TrfUtils.JobRunnerTransform.go
def go(self, commentForTaskDb='')
Definition: TrfUtils.py:288
python.TrfUtils.getDataSetName
def getDataSetName(name, sep='#')
Definition: TrfUtils.py:33
TaskManager
python.TrfUtils.JobRunnerTransform.prodDir
prodDir
Definition: TrfUtils.py:137
python.TrfUtils.JobRunnerTransform.inputfiles
inputfiles
Definition: TrfUtils.py:173
python.TrfUtils.JobRunnerTransform.getJobRunner
def getJobRunner(self, **jobRunnerArgs)
Definition: TrfUtils.py:209
python.TrfUtils.JobRunnerTransform.addOutput
def addOutput(self, paramName, templateName, jobDirName='')
Definition: TrfUtils.py:235
python.TrfUtils.JobRunnerTransform.jobname
jobname
Definition: TrfUtils.py:192
python.TrfUtils.JobRunnerTransform.outputds
outputds
Definition: TrfUtils.py:180
python.TrfUtils.JobRunnerTransform.optionalArgs
optionalArgs
Definition: TrfUtils.py:131
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.TrfUtils.JobRunnerTransform.setProdDir
def setProdDir(self, dir)
Definition: TrfUtils.py:197
python.TrfUtils.readJSON
def readJSON(fname)
Definition: TrfUtils.py:19
run
Definition: run.py:1
python.TrfUtils.JobRunnerTransform.argdict
argdict
Definition: TrfUtils.py:152
python.TrfUtils.JobRunnerTransform.outputfile
outputfile
Definition: TrfUtils.py:178
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.TrfUtils.JobRunnerTransform.mandatoryArgs
mandatoryArgs
Definition: TrfUtils.py:126
python.TrfUtils.JobRunnerTransform
Definition: TrfUtils.py:117
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.TrfUtils.JobRunnerTransform.runner
runner
Definition: TrfUtils.py:132
Trk::open
@ open
Definition: BinningType.h:40
python.TrfUtils.parseQualifiedFileNames
def parseQualifiedFileNames(inputList, sep='#')
Definition: TrfUtils.py:58
python.TrfUtils.getFileName
def getFileName(name, sep='#')
Definition: TrfUtils.py:45
python.TrfUtils.JobRunnerTransform.prodTaskDb
prodTaskDb
Definition: TrfUtils.py:138
python.JobRunner.JobRunner
Definition: JobRunner.py:99
str
Definition: BTagTrackIpAccessor.cxx:11
python.TrfUtils.JobRunnerTransform.taskname
taskname
Definition: TrfUtils.py:191
python.TrfUtils.JobRunnerTransform.outputParamName
outputParamName
Definition: TrfUtils.py:125
python.TrfUtils.JobRunnerTransform.configure
def configure(self)
Definition: TrfUtils.py:248
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.TrfUtils.getGuid
def getGuid(name)
Definition: TrfUtils.py:71
python.TrfUtils.JobRunnerTransform.__init__
def __init__(self, inputParamName, outputParamName, templateOutputName='outputfile', jobDirOutputName='', mandatoryArgs=[], optionalArgs=[])
Definition: TrfUtils.py:123