ATLAS Offline Software
Classes | Functions | Variables
python.PostProcessing Namespace Reference

Classes

class  PostponeProcessing
 
class  PostProcessingError
 
class  PostProcessingStep
 

Functions

def runPostProcStep (taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
 
def doPostProcessing (taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
 

Variables

string __author__ = 'Juerg Beringer'
 
string __version__ = '$Id $'
 

Function Documentation

◆ doPostProcessing()

def python.PostProcessing.doPostProcessing (   taskman,
  taskDict,
  postprocSteps,
  postprocLib,
  forceRun = False,
  jobName = None 
)
Execute a series of postprocessing steps. The task status is updated before
   and after the postprocessing. Log files are handled by each postprocessing step
   individually. Returns a list of executed postprocessing steps.

Definition at line 53 of file PostProcessing.py.

53 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
54  """Execute a series of postprocessing steps. The task status is updated before
55  and after the postprocessing. Log files are handled by each postprocessing step
56  individually. Returns a list of executed postprocessing steps."""
57  # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
58  # be removed?
59  dsName = taskDict['DSNAME']
60  taskName = taskDict['TASKNAME']
61  prePostProcStatus = taskDict['STATUS']
62  executedSteps = []
63 
64  # Check if there's anything to do
65  if not postprocSteps:
66  return [] # nothing to do
67 
68  #update again from DataBase
69  prePostProcStatus = taskman.getStatus(dsName,taskName)
70  # Don't run postprocessing if status is already postprocessing
71  if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
72  print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
73  return []
74 
75  # Start postprocessing
76  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
77  if jobName:
78  print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
79  else:
80  print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81 
82  # Get list of postprocessing status files that we may have to remove later
83  if jobName:
84  postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
85  else:
86  postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
87 
88  # Do each specified processing step. Postprocessing may be interrupted either deliberately
89  # by a postprocessing step or due to errors.
90  try:
91  for step in postprocSteps:
92  executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
93 
94  except PostponeProcessing as e:
95  # Stop postprocessing chain w/o error. New status will be determined below if not
96  # specified in the exception.
97  if e.newStatus:
98  taskman.setStatus(dsName,taskName,e.newStatus)
99  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
100  return executedSteps
101 
102  except PostProcessingError as e:
103  print (e)
104  if e.newStatus:
105  taskman.setStatus(dsName,taskName,e.newStatus)
106  print ('Executed steps: ',e.executedSteps)
107  return e.executedSteps
108  else:
109  if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
110  # Not all the jobs have run, so ignore error and postpone to later
111  pass
112 
113  else:
114  # All the jobs have run, so nothing new in the future
115  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
116  print ('Executed steps: ',e.executedSteps)
117  return e.executedSteps
118 
119  except Exception as e:
120  # Any other postprocessing error. Task status becomes POSTPROCFAILED.
121  print (e)
122  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
123  print ('Executed steps: ',executedSteps)
124  return executedSteps
125 
126  else:
127  # After successful postprocessing, delete POSTPROCESSING status files
128  # and mark corresponding jobs as completed
129  for p in postprocStamps:
130  os.system('rm -f %s' % p)
131  basename = p[:-15]
132  os.system('touch %s.COMPLETED' % basename)
133 
134  # Redetermine current status from files on disk
135  a = TaskAnalyzer('.',dsName,taskName)
136  if a.analyzeFiles():
137  a.updateStatus(taskman)
138  else:
139  # Postprocessing could have deleted the task and migrated it to tape
140  taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
141 
142  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
143  return executedSteps
144 
145 

◆ runPostProcStep()

def python.PostProcessing.runPostProcStep (   taskman,
  taskDict,
  oldStatus,
  previousSteps,
  step,
  postprocLib,
  jobName 
)
Instantiate and run a single postprocessing step.

Definition at line 40 of file PostProcessing.py.

40 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
41  """Instantiate and run a single postprocessing step."""
42  if hasattr(postprocLib,step):
43  print ('...',step,'\n')
44  # First reload taskDict - a previous step might have updated the info
45  taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
46  postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
47  postprocClass.run()
48  return postprocClass.executedSteps
49  else:
50  raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
51 
52 

Variable Documentation

◆ __author__

string python.PostProcessing.__author__ = 'Juerg Beringer'
private

Definition at line 8 of file PostProcessing.py.

◆ __version__

string python.PostProcessing.__version__ = '$Id $'
private

Definition at line 9 of file PostProcessing.py.

python.PostProcessing.doPostProcessing
def doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
Definition: PostProcessing.py:53
python.PostProcessing.runPostProcStep
def runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
Definition: PostProcessing.py:40
pool::getKey
std::string getKey(const std::string &key, const std::string &encoded)