53 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
54 """Execute a series of postprocessing steps. The task status is updated before
55 and after the postprocessing. Log files are handled by each postprocessing step
56 individually. Returns a list of executed postprocessing steps."""
59 dsName = taskDict[
'DSNAME']
60 taskName = taskDict[
'TASKNAME']
61 prePostProcStatus = taskDict[
'STATUS']
69 prePostProcStatus = taskman.getStatus(dsName,taskName)
71 if prePostProcStatus>=TaskManager.StatusCodes[
'POSTPROCRUNNING']
and not forceRun:
72 print (
'Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
76 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCRUNNING'])
78 print (
'Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
80 print (
'Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
84 postprocStamps = glob.glob(
'%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
86 postprocStamps = glob.glob(
'%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
91 for step
in postprocSteps:
92 executedSteps =
runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
94 except PostponeProcessing
as e:
98 taskman.setStatus(dsName,taskName,e.newStatus)
99 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
102 except PostProcessingError
as e:
105 taskman.setStatus(dsName,taskName,e.newStatus)
106 print (
'Executed steps: ',e.executedSteps)
107 return e.executedSteps
109 if (taskDict[
'NJOBS_SUBMITTED']+taskDict[
'NJOBS_RUNNING']) > 0:
115 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
116 print (
'Executed steps: ',e.executedSteps)
117 return e.executedSteps
119 except Exception
as e:
122 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
123 print (
'Executed steps: ',executedSteps)
129 for p
in postprocStamps:
130 os.system(
'rm -f %s' % p)
132 os.system(
'touch %s.COMPLETED' % basename)
135 a = TaskAnalyzer(
'.',dsName,taskName)
137 a.updateStatus(taskman)
140 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes[
'DELETED'])
142 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')