ATLAS Offline Software
Classes | Functions | Variables
python.PostProcessing Namespace Reference

Classes

class  PostponeProcessing
 
class  PostProcessingError
 
class  PostProcessingStep
 

Functions

def runPostProcStep (taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
 
def doPostProcessing (taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
 

Variables

string __author__ = 'Juerg Beringer'
 
string __version__ = '$Id $'
 

Function Documentation

◆ doPostProcessing()

def python.PostProcessing.doPostProcessing (   taskman,
  taskDict,
  postprocSteps,
  postprocLib,
  forceRun = False,
  jobName = None 
)
Execute a series of postprocessing steps. The task status is updated before
   and after the postprocessing. Log files are handled by each postprocessing step
   individually. Returns a list of executed postprocessing steps.

Definition at line 55 of file PostProcessing.py.

55 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56  """Execute a series of postprocessing steps. The task status is updated before
57  and after the postprocessing. Log files are handled by each postprocessing step
58  individually. Returns a list of executed postprocessing steps."""
59  # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
60  # be removed?
61  dsName = taskDict['DSNAME']
62  taskName = taskDict['TASKNAME']
63  prePostProcStatus = taskDict['STATUS']
64  executedSteps = []
65 
66  # Check if there's anything to do
67  if not postprocSteps:
68  return [] # nothing to do
69 
70  #update again from DataBase
71  prePostProcStatus = taskman.getStatus(dsName,taskName)
72  # Don't run postprocessing if status is already postprocessing
73  if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
74  print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
75  return []
76 
77  # Start postprocessing
78  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
79  if jobName:
80  print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81  else:
82  print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
83 
84  # Get list of postprocessing status files that we may have to remove later
85  if jobName:
86  postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
87  else:
88  postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
89 
90  # Do each specified processing step. Postprocessing may be interrupted either deliberately
91  # by a postprocessing step or due to errors.
92  try:
93  for step in postprocSteps:
94  executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
95 
96  except PostponeProcessing as e:
97  # Stop postprocessing chain w/o error. New status will be determined below if not
98  # specified in the exception.
99  if e.newStatus:
100  taskman.setStatus(dsName,taskName,e.newStatus)
101  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
102  return executedSteps
103 
104  except PostProcessingError as e:
105  print (e)
106  if e.newStatus:
107  taskman.setStatus(dsName,taskName,e.newStatus)
108  print ('Executed steps: ',e.executedSteps)
109  return e.executedSteps
110  else:
111  if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
112  # Not all the jobs have run, so ignore error and postpone to later
113  pass
114 
115  else:
116  # All the jobs have run, so nothing new in the future
117  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
118  print ('Executed steps: ',e.executedSteps)
119  return e.executedSteps
120 
121  except Exception as e:
122  # Any other postprocessing error. Task status becomes POSTPROCFAILED.
123  print (e)
124  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
125  print ('Executed steps: ',executedSteps)
126  return executedSteps
127 
128  else:
129  # After successful postprocessing, delete POSTPROCESSING status files
130  # and mark corresponding jobs as completed
131  for p in postprocStamps:
132  os.system('rm -f %s' % p)
133  basename = p[:-15]
134  os.system('touch %s.COMPLETED' % basename)
135 
136  # Redetermine current status from files on disk
137  a = TaskAnalyzer('.',dsName,taskName)
138  if a.analyzeFiles():
139  a.updateStatus(taskman)
140  else:
141  # Postprocessing could have deleted the task and migrated it to tape
142  taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
143 
144  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
145  return executedSteps
146 
147 

◆ runPostProcStep()

def python.PostProcessing.runPostProcStep (   taskman,
  taskDict,
  oldStatus,
  previousSteps,
  step,
  postprocLib,
  jobName 
)
Instantiate and run a single postprocessing step.

Definition at line 42 of file PostProcessing.py.

42 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43  """Instantiate and run a single postprocessing step."""
44  if hasattr(postprocLib,step):
45  print ('...',step,'\n')
46  # First reload taskDict - a previous step might have updated the info
47  taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
48  postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
49  postprocClass.run()
50  return postprocClass.executedSteps
51  else:
52  raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
53 
54 

Variable Documentation

◆ __author__

string python.PostProcessing.__author__ = 'Juerg Beringer'
private

Definition at line 8 of file PostProcessing.py.

◆ __version__

string python.PostProcessing.__version__ = '$Id $'
private

Definition at line 9 of file PostProcessing.py.

buildDatabase.getKey
def getKey(filename)
Definition: buildDatabase.py:545
python.PostProcessing.doPostProcessing
def doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
Definition: PostProcessing.py:55
python.PostProcessing.runPostProcStep
def runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
Definition: PostProcessing.py:42