ATLAS Offline Software
PostProcessing.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4 
5 """
6 This module defines the generic infrastructure for task postprocessing.
7 """
8 __author__ = 'Juerg Beringer'
9 __version__ = '$Id $'
10 
11 
12 import glob, time, sys, os, math
13 import subprocess
14 
15 from InDetBeamSpotExample.TaskManager import TaskAnalyzer, TaskManager, getKey
16 from InDetBeamSpotExample.Utils import getUserName
17 
18 
19 
20 
21 # Exception classes
22 class PostProcessingError(Exception):
23  """Generic postprocessing exception class."""
24  def __init__(self,message,executedSteps,newStatus=None):
25  self.message = message
26  self.executedSteps = executedSteps
27  self.newStatus = newStatus
28  super().__init__(message,executedSteps,newStatus)
29  def __str__(self):
30  return self.message
31 
32 class PostponeProcessing(Exception):
33  """Exception used to postpone postprocessing."""
34  def __init__(self,message,executedSteps,newStatus=None):
35  self.message = message
36  self.newStatus = newStatus
37  super().__init__(message,executedSteps,newStatus)
38  def __str__(self):
39  return self.message
40 
41 
42 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43  """Instantiate and run a single postprocessing step."""
44  if hasattr(postprocLib,step):
45  print ('...',step,'\n')
46  # First reload taskDict - a previous step might have updated the info
47  taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
48  postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
49  postprocClass.run()
50  return postprocClass.executedSteps
51  else:
52  raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
53 
54 
55 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56  """Execute a series of postprocessing steps. The task status is updated before
57  and after the postprocessing. Log files are handled by each postprocessing step
58  individually. Returns a list of executed postprocessing steps."""
59  # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
60  # be removed?
61  dsName = taskDict['DSNAME']
62  taskName = taskDict['TASKNAME']
63  prePostProcStatus = taskDict['STATUS']
64  executedSteps = []
65 
66  # Check if there's anything to do
67  if not postprocSteps:
68  return [] # nothing to do
69 
70  #update again from DataBase
71  prePostProcStatus = taskman.getStatus(dsName,taskName)
72  # Don't run postprocessing if status is already postprocessing
73  if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
74  print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
75  return []
76 
77  # Start postprocessing
78  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
79  if jobName:
80  print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81  else:
82  print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
83 
84  # Get list of postprocessing status files that we may have to remove later
85  if jobName:
86  postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
87  else:
88  postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
89 
90  # Do each specified processing step. Postprocessing may be interrupted either deliberately
91  # by a postprocessing step or due to errors.
92  try:
93  for step in postprocSteps:
94  executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
95 
96  except PostponeProcessing as e:
97  # Stop postprocessing chain w/o error. New status will be determined below if not
98  # specified in the exception.
99  if e.newStatus:
100  taskman.setStatus(dsName,taskName,e.newStatus)
101  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
102  return executedSteps
103 
104  except PostProcessingError as e:
105  print (e)
106  if e.newStatus:
107  taskman.setStatus(dsName,taskName,e.newStatus)
108  print ('Executed steps: ',e.executedSteps)
109  return e.executedSteps
110  else:
111  if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
112  # Not all the jobs have run, so ignore error and postpone to later
113  pass
114 
115  else:
116  # All the jobs have run, so nothing new in the future
117  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
118  print ('Executed steps: ',e.executedSteps)
119  return e.executedSteps
120 
121  except Exception as e:
122  # Any other postprocessing error. Task status becomes POSTPROCFAILED.
123  print (e)
124  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
125  print ('Executed steps: ',executedSteps)
126  return executedSteps
127 
128  else:
129  # After successful postprocessing, delete POSTPROCESSING status files
130  # and mark corresponding jobs as completed
131  for p in postprocStamps:
132  os.system('rm -f %s' % p)
133  basename = p[:-15]
134  os.system('touch %s.COMPLETED' % basename)
135 
136  # Redetermine current status from files on disk
137  a = TaskAnalyzer('.',dsName,taskName)
138  if a.analyzeFiles():
139  a.updateStatus(taskman)
140  else:
141  # Postprocessing could have deleted the task and migrated it to tape
142  taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
143 
144  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
145  return executedSteps
146 
147 
149 
150  def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
151  self.stepName = self.__class__.__name__
152  self.executedSteps = previousSteps
153  self.executedSteps.append( self.stepName )
154  self.taskman = taskman
155  self.taskDict = taskDict
156  self.dsName = taskDict['DSNAME']
157  self.taskName = taskDict['TASKNAME']
158  self.taskDir = '/'.join([self.dsName,self.taskName])
159  self.oldStatus = oldStatus
160  self.postprocLib = postprocLib
161  self.jobName = jobName
162  self.baseName = '%s-%s' % (self.dsName,self.taskName)
163  self.baseNameStep = '%s-%s.%s' % (self.dsName,self.taskName,self.stepName)
164  self.logfile = open('%s/%s.log' % (self.taskDir,self.baseNameStep), 'w')
165  self.log('Starting postprocessing: %s' % self.__class__.__name__)
166  self.log('* Data set name = %s' % self.dsName, noTime=True)
167  self.log('* Task name = %s' % self.taskName, noTime=True)
168  self.log('* Old status = %s' % getKey(TaskManager.StatusCodes,self.oldStatus), noTime=True)
169  self.log('* Job name = %s' % self.jobName, noTime=True)
170  self.log('* User name = %s' % getUserName(), noTime=True)
171  self.log('* Host name = %s\n' % os.uname()[1], noTime=True)
172  pass
173 
174  def __del__(self):
175  self.log('Done')
176  self.logfile.close()
177 
178  def log(self,msg=None,text=None,noTime=False,doPrint=False):
179  outputFiles = [self.logfile]
180  if doPrint:
181  outputFiles.append(sys.stdout)
182  for out in outputFiles:
183  if msg:
184  if noTime:
185  out.write(' '*31)
186  out.write(msg)
187  out.write('\n')
188  else:
189  out.write('\n%-30s %s\n' % (time.strftime('%a %b %d %X %Z %Y'),msg))
190  if text:
191  #out.write('\n')
192  out.write(text)
193  out.write('\n')
194 
195  def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
196  (status,output) = subprocess.getstatusoutput(cmd)
197  status = status >> 8 # Convert to standard Unix exit code
198  self.log('Executing: %s' % cmd, output)
199  if doPrint or status:
200  print (output)
201  print()
202  if status and checkStatusCode:
203  if not errorMsg:
204  errorMsg = 'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.stepName,cmd)
205  self.log(text='\nERROR: exit status = %i' % (status))
206  self.log(text=errorMsg)
207  if abortOnError:
208  raise PostProcessingError(errorMsg,self.executedSteps)
209  return status
210 
211  def getFileName(self,postFix,stepName='',absPath=False):
212  if stepName:
213  name = '%s.%s%s' % (self.baseName,stepName,postFix)
214  else:
215  name = self.baseNameStep+postFix
216  if absPath:
217  name = '%s/%s' % (self.taskDir,name)
218  return name
219 
220  def jobList(self):
221  try:
222  l = os.listdir(self.taskDir)
223  except Exception:
224  l = []
225  return l
226 
227  def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
228  """Get list of files with specific names from all jobs in a given state."""
229  oldDir = os.getcwd()
230  os.chdir(self.taskDir)
231  if jobName:
232  statusFiles = glob.glob('%s/*.status.%s' % (jobName,statusName))
233  else:
234  statusFiles = glob.glob('*/*.status.%s' % (statusName))
235  files = []
236  for s in statusFiles:
237  b = '/'.join(s.split('/')[:-1])
238  files.extend(glob.glob('%s/%s' % (b,fileNamePattern)))
239  os.chdir(oldDir)
240  return sorted(files)
241 
242  def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
243  """Execute staged hadd for many files."""
244  if len(inputFiles)<=maxFilesPerHadd:
245  self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(inputFiles)))
246  else:
247  nSteps = int(math.ceil(float(len(inputFiles))/maxFilesPerHadd))
248  partFiles = []
249  for index in range(nSteps):
250  minIndex = index*maxFilesPerHadd
251  maxIndex = min((index+1)*maxFilesPerHadd,len(inputFiles))
252  stepInputFiles = inputFiles[minIndex:maxIndex]
253  stepOutputFileName = '%s.part%i' % (outputFileName,index)
254  self.logExec('cd %s; hadd -f %s %s' % (dir,stepOutputFileName,' '.join(stepInputFiles)))
255  partFiles.append(stepOutputFileName)
256  self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(partFiles)))
257  self.logExec('cd %s; rm -f %s' % (dir,' '.join(partFiles)))
258 
259  def getJobConfig(self,jobName=None):
260  config = {}
261  if not jobName:
262  jobName = '*'
263  configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py.final.py'))
264  if not configFile:
265  configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py'))
266  if configFile:
267  exec(open(configFile[0],'r'),config) # Eval config file and put defs into config dict
268  return config['jobConfig']
269 
270  def addResult(self,resultFileName):
271  # FIXME: strip leading dirs from resultFilename, so I can pass an absolute file name?
272  if not resultFileName:
273  return
274  resultFiles = self.taskDict['RESULTFILES']
275  if resultFiles is None:
276  resultFiles = ''
277  if resultFileName not in resultFiles.split():
278  resultFiles = ' '.join([resultFiles,resultFileName])
279  self.taskman.setValue(self.dsName,self.taskName,'RESULTFILES',resultFiles)
280  self.taskDict['RESULTFILES'] = resultFiles
python.PostProcessing.PostProcessingStep.getFileName
def getFileName(self, postFix, stepName='', absPath=False)
Definition: PostProcessing.py:211
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename R::value_type > sorted(const R &r, PROJ proj={})
Helper function to create a sorted vector from an unsorted range.
python.PostProcessing.PostProcessingStep.addResult
def addResult(self, resultFileName)
Definition: PostProcessing.py:270
python.PostProcessing.PostProcessingStep.baseNameStep
baseNameStep
Definition: PostProcessing.py:163
python.PostProcessing.PostProcessingError.newStatus
newStatus
Definition: PostProcessing.py:27
python.PostProcessing.PostProcessingStep.taskFileList
def taskFileList(self, fileNamePattern, statusName='POSTPROCESSING', jobName=None)
Definition: PostProcessing.py:227
python.PostProcessing.PostProcessingStep.stepName
stepName
Definition: PostProcessing.py:151
python.PostProcessing.PostProcessingStep.taskDict
taskDict
Definition: PostProcessing.py:155
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
buildDatabase.getKey
def getKey(filename)
Definition: buildDatabase.py:545
python.PostProcessing.PostProcessingStep.__init__
def __init__(self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None)
Definition: PostProcessing.py:150
python.PostProcessing.PostProcessingStep.logExec
def logExec(self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True)
Definition: PostProcessing.py:195
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.PostProcessing.PostProcessingStep.oldStatus
oldStatus
Definition: PostProcessing.py:159
TaskManager
python.PostProcessing.PostProcessingStep.dsName
dsName
Definition: PostProcessing.py:156
python.PostProcessing.PostponeProcessing.__str__
def __str__(self)
Definition: PostProcessing.py:38
python.PostProcessing.PostProcessingError.__str__
def __str__(self)
Definition: PostProcessing.py:29
python.PostProcessing.PostProcessingError.message
message
Definition: PostProcessing.py:25
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
python.PostProcessing.PostProcessingStep.__del__
def __del__(self)
Definition: PostProcessing.py:174
python.PostProcessing.PostponeProcessing
Definition: PostProcessing.py:32
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
python.PostProcessing.PostProcessingStep.taskDir
taskDir
Definition: PostProcessing.py:158
python.PostProcessing.PostponeProcessing.__init__
def __init__(self, message, executedSteps, newStatus=None)
Definition: PostProcessing.py:34
python.PostProcessing.PostponeProcessing.message
message
Definition: PostProcessing.py:35
python.PostProcessing.PostProcessingStep
Definition: PostProcessing.py:148
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.PostProcessing.PostProcessingStep.jobName
jobName
Definition: PostProcessing.py:161
Trk::open
@ open
Definition: BinningType.h:40
python.PostProcessing.PostProcessingStep.postprocLib
postprocLib
Definition: PostProcessing.py:160
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
python.PostProcessing.doPostProcessing
def doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
Definition: PostProcessing.py:55
python.PostProcessing.runPostProcStep
def runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
Definition: PostProcessing.py:42
python.PostProcessing.PostProcessingStep.log
def log(self, msg=None, text=None, noTime=False, doPrint=False)
Definition: PostProcessing.py:178
python.PostProcessing.PostProcessingStep.taskman
taskman
Definition: PostProcessing.py:154
python.PostProcessing.PostProcessingStep.taskName
taskName
Definition: PostProcessing.py:157
python.PostProcessing.PostProcessingStep.logfile
logfile
Definition: PostProcessing.py:164
python.PostProcessing.PostProcessingStep.hadd
def hadd(self, dir, outputFileName, inputFiles, maxFilesPerHadd=100)
Definition: PostProcessing.py:242
python.PostProcessing.PostProcessingError.executedSteps
executedSteps
Definition: PostProcessing.py:26
python.PostProcessing.PostProcessingError
Definition: PostProcessing.py:22
python.PostProcessing.PostProcessingStep.baseName
baseName
Definition: PostProcessing.py:162
python.PostProcessing.PostponeProcessing.newStatus
newStatus
Definition: PostProcessing.py:36
python.PostProcessing.PostProcessingError.__init__
def __init__(self, message, executedSteps, newStatus=None)
Definition: PostProcessing.py:24
python.PostProcessing.PostProcessingStep.executedSteps
executedSteps
Definition: PostProcessing.py:152
python.PostProcessing.PostProcessingStep.getJobConfig
def getJobConfig(self, jobName=None)
Definition: PostProcessing.py:259
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48
python.PostProcessing.PostProcessingStep.jobList
def jobList(self)
Definition: PostProcessing.py:220
python.LArMinBiasAlgConfig.float
float
Definition: LArMinBiasAlgConfig.py:65