6 This module defines the generic infrastructure for task postprocessing.
8 __author__ =
'Juerg Beringer'
12 import glob, time, sys, os, math
16 from InDetBeamSpotExample.Utils
import getUserName
23 """Generic postprocessing exception class."""
24 def __init__(self,message,executedSteps,newStatus=None):
32 """Exception used to postpone postprocessing."""
33 def __init__(self,message,executedSteps,newStatus=None):
40 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
41 """Instantiate and run a single postprocessing step."""
42 if hasattr(postprocLib,step):
43 print (
'...',step,
'\n')
45 taskDict = taskman.getTaskDict(taskDict[
'DSNAME'],taskDict[
'TASKNAME'])
46 postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
48 return postprocClass.executedSteps
50 raise PostProcessingError(
'ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes[
'POSTPROCFAILED'])
53 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
54 """Execute a series of postprocessing steps. The task status is updated before
55 and after the postprocessing. Log files are handled by each postprocessing step
56 individually. Returns a list of executed postprocessing steps."""
59 dsName = taskDict[
'DSNAME']
60 taskName = taskDict[
'TASKNAME']
61 prePostProcStatus = taskDict[
'STATUS']
69 prePostProcStatus = taskman.getStatus(dsName,taskName)
71 if prePostProcStatus>=TaskManager.StatusCodes[
'POSTPROCRUNNING']
and not forceRun:
72 print (
'Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
76 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCRUNNING'])
78 print (
'Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
80 print (
'Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
84 postprocStamps = glob.glob(
'%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
86 postprocStamps = glob.glob(
'%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
91 for step
in postprocSteps:
92 executedSteps =
runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
94 except PostponeProcessing
as e:
98 taskman.setStatus(dsName,taskName,e.newStatus)
99 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
102 except PostProcessingError
as e:
105 taskman.setStatus(dsName,taskName,e.newStatus)
106 print (
'Executed steps: ',e.executedSteps)
107 return e.executedSteps
109 if (taskDict[
'NJOBS_SUBMITTED']+taskDict[
'NJOBS_RUNNING']) > 0:
115 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
116 print (
'Executed steps: ',e.executedSteps)
117 return e.executedSteps
119 except Exception
as e:
122 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
123 print (
'Executed steps: ',executedSteps)
129 for p
in postprocStamps:
130 os.system(
'rm -f %s' % p)
132 os.system(
'touch %s.COMPLETED' % basename)
135 a = TaskAnalyzer(
'.',dsName,taskName)
137 a.updateStatus(taskman)
140 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes[
'DELETED'])
142 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
148 def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
163 self.
log(
'Starting postprocessing: %s' % self.__class__.__name__)
164 self.
log(
'* Data set name = %s' % self.
dsName, noTime=
True)
165 self.
log(
'* Task name = %s' % self.
taskName, noTime=
True)
166 self.
log(
'* Old status = %s' %
getKey(TaskManager.StatusCodes,self.
oldStatus), noTime=
True)
167 self.
log(
'* Job name = %s' % self.
jobName, noTime=
True)
169 self.
log(
'* Host name = %s\n' % os.uname()[1], noTime=
True)
176 def log(self,msg=None,text=None,noTime=False,doPrint=False):
179 outputFiles.append(sys.stdout)
180 for out
in outputFiles:
187 out.write(
'\n%-30s %s\n' % (time.strftime(
'%a %b %d %X %Z %Y'),msg))
193 def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
194 (status,output) = subprocess.getstatusoutput(cmd)
196 self.
log(
'Executing: %s' % cmd, output)
197 if doPrint
or status:
200 if status
and checkStatusCode:
202 errorMsg =
'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.
stepName,cmd)
203 self.
log(text=
'\nERROR: exit status = %i' % (status))
204 self.
log(text=errorMsg)
211 name =
'%s.%s%s' % (self.
baseName,stepName,postFix)
215 name =
'%s/%s' % (self.
taskDir,name)
225 def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
226 """Get list of files with specific names from all jobs in a given state."""
230 statusFiles = glob.glob(
'%s/*.status.%s' % (jobName,statusName))
232 statusFiles = glob.glob(
'*/*.status.%s' % (statusName))
234 for s
in statusFiles:
235 b =
'/'.
join(s.split(
'/')[:-1])
236 files.extend(glob.glob(
'%s/%s' % (b,fileNamePattern)))
240 def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
241 """Execute staged hadd for many files."""
242 if len(inputFiles)<=maxFilesPerHadd:
243 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(inputFiles)))
245 nSteps =
int(math.ceil(
float(len(inputFiles))/maxFilesPerHadd))
247 for index
in range(nSteps):
248 minIndex = index*maxFilesPerHadd
249 maxIndex =
min((index+1)*maxFilesPerHadd,len(inputFiles))
250 stepInputFiles = inputFiles[minIndex:maxIndex]
251 stepOutputFileName =
'%s.part%i' % (outputFileName,index)
252 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,stepOutputFileName,
' '.
join(stepInputFiles)))
253 partFiles.append(stepOutputFileName)
254 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(partFiles)))
255 self.
logExec(
'cd %s; rm -f %s' % (dir,
' '.
join(partFiles)))
261 configFile = glob.glob(
'%s/%s/%s' % (self.
taskDir,jobName,
'*.config.py.final.py'))
263 configFile = glob.glob(
'%s/%s/%s' % (self.
taskDir,jobName,
'*.config.py'))
265 exec(
open(configFile[0],
'r'),config)
266 return config[
'jobConfig']
270 if not resultFileName:
272 resultFiles = self.
taskDict[
'RESULTFILES']
273 if resultFiles
is None:
275 if resultFileName
not in resultFiles.split():
276 resultFiles =
' '.
join([resultFiles,resultFileName])
278 self.
taskDict[
'RESULTFILES'] = resultFiles