12import glob, time, sys, os, math
42def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43 """Instantiate and run a single postprocessing step."""
44 if hasattr(postprocLib,step):
45 print (
'...',step,
'\n')
47 taskDict = taskman.getTaskDict(taskDict[
'DSNAME'],taskDict[
'TASKNAME'])
48 postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
50 return postprocClass.executedSteps
52 raise PostProcessingError(
'ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes[
'POSTPROCFAILED'])
55def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
61 dsName = taskDict[
'DSNAME']
62 taskName = taskDict[
'TASKNAME']
63 prePostProcStatus = taskDict[
'STATUS']
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
73 if prePostProcStatus>=TaskManager.StatusCodes[
'POSTPROCRUNNING']
and not forceRun:
74 print (
'Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCRUNNING'])
80 print (
'Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
82 print (
'Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
86 postprocStamps = glob.glob(
'%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
88 postprocStamps = glob.glob(
'%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
93 for step
in postprocSteps:
94 executedSteps =
runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
96 except PostponeProcessing
as e:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
104 except PostProcessingError
as e:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print (
'Executed steps: ',e.executedSteps)
109 return e.executedSteps
111 if (taskDict[
'NJOBS_SUBMITTED']+taskDict[
'NJOBS_RUNNING']) > 0:
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
118 print (
'Executed steps: ',e.executedSteps)
119 return e.executedSteps
121 except Exception
as e:
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
125 print (
'Executed steps: ',executedSteps)
131 for p
in postprocStamps:
132 os.system(
'rm -f %s' % p)
134 os.system(
'touch %s.COMPLETED' % basename)
137 a = TaskAnalyzer(
'.',dsName,taskName)
139 a.updateStatus(taskman)
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes[
'DELETED'])
144 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
__init__(self, message, executedSteps, newStatus=None)
taskFileList(self, fileNamePattern, statusName='POSTPROCESSING', jobName=None)
log(self, msg=None, text=None, noTime=False, doPrint=False)
__init__(self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None)
getFileName(self, postFix, stepName='', absPath=False)
getJobConfig(self, jobName=None)
logExec(self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True)
addResult(self, resultFileName)
hadd(self, dir, outputFileName, inputFiles, maxFilesPerHadd=100)
__init__(self, message, executedSteps, newStatus=None)
runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)