55def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
59
60
61 dsName = taskDict['DSNAME']
62 taskName = taskDict['TASKNAME']
63 prePostProcStatus = taskDict['STATUS']
64 executedSteps = []
65
66
67 if not postprocSteps:
68 return []
69
70
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
72
73 if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
74 print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
75 return []
76
77
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
79 if jobName:
80 print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81 else:
82 print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
83
84
85 if jobName:
86 postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
87 else:
88 postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
89
90
91
92 try:
93 for step in postprocSteps:
94 executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
95
96 except PostponeProcessing as e:
97
98
99 if e.newStatus:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
102 return executedSteps
103
104 except PostProcessingError as e:
105 print (e)
106 if e.newStatus:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print ('Executed steps: ',e.executedSteps)
109 return e.executedSteps
110 else:
111 if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
112
113 pass
114
115 else:
116
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
118 print ('Executed steps: ',e.executedSteps)
119 return e.executedSteps
120
121 except Exception as e:
122
123 print (e)
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
125 print ('Executed steps: ',executedSteps)
126 return executedSteps
127
128 else:
129
130
131 for p in postprocStamps:
132 os.system('rm -f %s' % p)
133 basename = p[:-15]
134 os.system('touch %s.COMPLETED' % basename)
135
136
137 a = TaskAnalyzer('.',dsName,taskName)
138 if a.analyzeFiles():
139 a.updateStatus(taskman)
140 else:
141
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
143
144 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
145 return executedSteps
146
147