ATLAS Offline Software
Loading...
Searching...
No Matches
python.PostProcessing Namespace Reference

Classes

class  PostponeProcessing
class  PostProcessingError
class  PostProcessingStep

Functions

 runPostProcStep (taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
 doPostProcessing (taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)

Variables

str __author__ = 'Juerg Beringer'
str __version__ = '$Id $'

Detailed Description

This module defines the generic infrastructure for task postprocessing.

Function Documentation

◆ doPostProcessing()

python.PostProcessing.doPostProcessing ( taskman,
taskDict,
postprocSteps,
postprocLib,
forceRun = False,
jobName = None )
Execute a series of postprocessing steps. The task status is updated before
   and after the postprocessing. Log files are handled by each postprocessing step
   individually. Returns a list of executed postprocessing steps.

Definition at line 55 of file PostProcessing.py.

55def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
59 # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
60 # be removed?
61 dsName = taskDict['DSNAME']
62 taskName = taskDict['TASKNAME']
63 prePostProcStatus = taskDict['STATUS']
64 executedSteps = []
65
66 # Check if there's anything to do
67 if not postprocSteps:
68 return [] # nothing to do
69
70 #update again from DataBase
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
72 # Don't run postprocessing if status is already postprocessing
73 if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
74 print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
75 return []
76
77 # Start postprocessing
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
79 if jobName:
80 print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81 else:
82 print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
83
84 # Get list of postprocessing status files that we may have to remove later
85 if jobName:
86 postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
87 else:
88 postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
89
90 # Do each specified processing step. Postprocessing may be interrupted either deliberately
91 # by a postprocessing step or due to errors.
92 try:
93 for step in postprocSteps:
94 executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
95
96 except PostponeProcessing as e:
97 # Stop postprocessing chain w/o error. New status will be determined below if not
98 # specified in the exception.
99 if e.newStatus:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
102 return executedSteps
103
104 except PostProcessingError as e:
105 print (e)
106 if e.newStatus:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print ('Executed steps: ',e.executedSteps)
109 return e.executedSteps
110 else:
111 if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
112 # Not all the jobs have run, so ignore error and postpone to later
113 pass
114
115 else:
116 # All the jobs have run, so nothing new in the future
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
118 print ('Executed steps: ',e.executedSteps)
119 return e.executedSteps
120
121 except Exception as e:
122 # Any other postprocessing error. Task status becomes POSTPROCFAILED.
123 print (e)
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
125 print ('Executed steps: ',executedSteps)
126 return executedSteps
127
128 else:
129 # After successful postprocessing, delete POSTPROCESSING status files
130 # and mark corresponding jobs as completed
131 for p in postprocStamps:
132 os.system('rm -f %s' % p)
133 basename = p[:-15]
134 os.system('touch %s.COMPLETED' % basename)
135
136 # Redetermine current status from files on disk
137 a = TaskAnalyzer('.',dsName,taskName)
138 if a.analyzeFiles():
139 a.updateStatus(taskman)
140 else:
141 # Postprocessing could have deleted the task and migrated it to tape
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
143
144 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
145 return executedSteps
146
147

◆ runPostProcStep()

python.PostProcessing.runPostProcStep ( taskman,
taskDict,
oldStatus,
previousSteps,
step,
postprocLib,
jobName )
Instantiate and run a single postprocessing step.

Definition at line 42 of file PostProcessing.py.

42def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43 """Instantiate and run a single postprocessing step."""
44 if hasattr(postprocLib,step):
45 print ('...',step,'\n')
46 # First reload taskDict - a previous step might have updated the info
47 taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
48 postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
49 postprocClass.run()
50 return postprocClass.executedSteps
51 else:
52 raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
53
54

Variable Documentation

◆ __author__

str python.PostProcessing.__author__ = 'Juerg Beringer'
private

Definition at line 8 of file PostProcessing.py.

◆ __version__

str python.PostProcessing.__version__ = '$Id $'
private

Definition at line 9 of file PostProcessing.py.