55 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
61 dsName = taskDict[
'DSNAME']
62 taskName = taskDict[
'TASKNAME']
63 prePostProcStatus = taskDict[
'STATUS']
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
73 if prePostProcStatus>=TaskManager.StatusCodes[
'POSTPROCRUNNING']
and not forceRun:
74 print (
'Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCRUNNING'])
80 print (
'Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
82 print (
'Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,
getKey(TaskManager.StatusCodes,prePostProcStatus)))
86 postprocStamps = glob.glob(
'%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
88 postprocStamps = glob.glob(
'%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
93 for step
in postprocSteps:
94 executedSteps =
runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
96 except PostponeProcessing
as e:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')
104 except PostProcessingError
as e:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print (
'Executed steps: ',e.executedSteps)
109 return e.executedSteps
111 if (taskDict[
'NJOBS_SUBMITTED']+taskDict[
'NJOBS_RUNNING']) > 0:
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
118 print (
'Executed steps: ',e.executedSteps)
119 return e.executedSteps
121 except Exception
as e:
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes[
'POSTPROCFAILED'])
125 print (
'Executed steps: ',executedSteps)
131 for p
in postprocStamps:
132 os.system(
'rm -f %s' % p)
134 os.system(
'touch %s.COMPLETED' % basename)
137 a = TaskAnalyzer(
'.',dsName,taskName)
139 a.updateStatus(taskman)
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes[
'DELETED'])
144 print (
'%i step(s) completed successfully: ' % len(executedSteps),executedSteps,
'\n')