6 This module defines the generic infrastructure for task postprocessing.
8 __author__ =
'Juerg Beringer'
12 import glob, time, sys, os, math
16 from InDetBeamSpotExample.Utils
import getUserName
23 """Generic postprocessing exception class."""
24 def __init__(self,message,executedSteps,newStatus=None):
28 super().
__init__(message,executedSteps,newStatus)
33 """Exception used to postpone postprocessing."""
34 def __init__(self,message,executedSteps,newStatus=None):
37 super().
__init__(message,executedSteps,newStatus)
42 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43 """Instantiate and run a single postprocessing step."""
44 if hasattr(postprocLib,step):
45 print (
'...',step,
'\n')
47 taskDict = taskman.getTaskDict(taskDict[
'DSNAME'],taskDict[
'TASKNAME'])
48 postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
50 return postprocClass.executedSteps
52 raise PostProcessingError(
'ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes[
'POSTPROCFAILED'])
55 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
61 dsName = taskDict[
'DSNAME']
62 taskName = taskDict[
'TASKNAME']
63 prePostProcStatus = taskDict[
'STATUS']
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
73 if prePostProcStatus>=TaskManager.StatusCodes[
'POSTPROCRUNNING']
and not forceRun:
74 print (
'Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCRUNNING'])
80 print (
'Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
82 print (
'Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
86 postprocStamps = glob.glob(
'%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
88 postprocStamps = glob.glob(
'%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
93 for step
in postprocSteps:
94 executedSteps =
runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
96 except PostponeProcessing
as e:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
104 except PostProcessingError
as e:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print (
'Executed steps: ',e.executedSteps)
109 return e.executedSteps
111 if (taskDict[
'NJOBS_SUBMITTED']+taskDict[
'NJOBS_RUNNING']) > 0:
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
118 print (
'Executed steps: ',e.executedSteps)
119 return e.executedSteps
121 except Exception
as e:
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
125 print (
'Executed steps: ',executedSteps)
131 for p
in postprocStamps:
132 os.system(
'rm -f %s' % p)
134 os.system(
'touch %s.COMPLETED' % basename)
137 a = TaskAnalyzer(
'.',dsName,taskName)
139 a.updateStatus(taskman)
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes[
'DELETED'])
144 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
150 def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
165 self.
log(
'Starting postprocessing: %s' % self.__class__.__name__)
166 self.
log(
'* Data set name = %s' % self.
dsName, noTime=
True)
167 self.
log(
'* Task name = %s' % self.
taskName, noTime=
True)
168 self.
log(
'* Old status = %s' %
getKey(TaskManager.StatusCodes,self.
oldStatus), noTime=
True)
169 self.
log(
'* Job name = %s' % self.
jobName, noTime=
True)
171 self.
log(
'* Host name = %s\n' % os.uname()[1], noTime=
True)
178 def log(self,msg=None,text=None,noTime=False,doPrint=False):
181 outputFiles.append(sys.stdout)
182 for out
in outputFiles:
189 out.write(
'\n%-30s %s\n' % (time.strftime(
'%a %b %d %X %Z %Y'),msg))
195 def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
196 (status,output) = subprocess.getstatusoutput(cmd)
198 self.
log(
'Executing: %s' % cmd, output)
199 if doPrint
or status:
202 if status
and checkStatusCode:
204 errorMsg =
'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.
stepName,cmd)
205 self.
log(text=
'\nERROR: exit status = %i' % (status))
206 self.
log(text=errorMsg)
213 name =
'%s.%s%s' % (self.
baseName,stepName,postFix)
217 name =
'%s/%s' % (self.
taskDir,name)
227 def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
228 """Get list of files with specific names from all jobs in a given state."""
232 statusFiles = glob.glob(
'%s/*.status.%s' % (jobName,statusName))
234 statusFiles = glob.glob(
'*/*.status.%s' % (statusName))
236 for s
in statusFiles:
237 b =
'/'.
join(s.split(
'/')[:-1])
238 files.extend(glob.glob(
'%s/%s' % (b,fileNamePattern)))
242 def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
243 """Execute staged hadd for many files."""
244 if len(inputFiles)<=maxFilesPerHadd:
245 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(inputFiles)))
247 nSteps =
int(math.ceil(
float(len(inputFiles))/maxFilesPerHadd))
249 for index
in range(nSteps):
250 minIndex = index*maxFilesPerHadd
251 maxIndex =
min((index+1)*maxFilesPerHadd,len(inputFiles))
252 stepInputFiles = inputFiles[minIndex:maxIndex]
253 stepOutputFileName =
'%s.part%i' % (outputFileName,index)
254 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,stepOutputFileName,
' '.
join(stepInputFiles)))
255 partFiles.append(stepOutputFileName)
256 self.
logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(partFiles)))
257 self.
logExec(
'cd %s; rm -f %s' % (dir,
' '.
join(partFiles)))
263 configFile = glob.glob(
'%s/%s/%s' % (self.
taskDir,jobName,
'*.config.py.final.py'))
265 configFile = glob.glob(
'%s/%s/%s' % (self.
taskDir,jobName,
'*.config.py'))
267 exec(
open(configFile[0],
'r'),config)
268 return config[
'jobConfig']
272 if not resultFileName:
274 resultFiles = self.
taskDict[
'RESULTFILES']
275 if resultFiles
is None:
277 if resultFileName
not in resultFiles.split():
278 resultFiles =
' '.
join([resultFiles,resultFileName])
280 self.
taskDict[
'RESULTFILES'] = resultFiles