ATLAS Offline Software
PostProcessing.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 
5 """
6 This module defines the generic infrastructure for task postprocessing.
7 """
8 __author__ = 'Juerg Beringer'
9 __version__ = '$Id $'
10 
11 
12 import glob, time, sys, os, math
13 import subprocess
14 
15 from InDetBeamSpotExample.TaskManager import TaskAnalyzer, TaskManager, getKey
16 from InDetBeamSpotExample.Utils import getUserName
17 
18 
19 
20 
21 # Exception classes
22 class PostProcessingError(Exception):
23  """Generic postprocessing exception class."""
24  def __init__(self,message,executedSteps,newStatus=None):
25  self.message = message
26  self.executedSteps = executedSteps
27  self.newStatus = newStatus
28  def __str__(self):
29  return self.message
30 
31 class PostponeProcessing(Exception):
32  """Exception used to postpone postprocessing."""
33  def __init__(self,message,executedSteps,newStatus=None):
34  self.message = message
35  self.newStatus = newStatus
36  def __str__(self):
37  return self.message
38 
39 
40 def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
41  """Instantiate and run a single postprocessing step."""
42  if hasattr(postprocLib,step):
43  print ('...',step,'\n')
44  # First reload taskDict - a previous step might have updated the info
45  taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
46  postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
47  postprocClass.run()
48  return postprocClass.executedSteps
49  else:
50  raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
51 
52 
53 def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
54  """Execute a series of postprocessing steps. The task status is updated before
55  and after the postprocessing. Log files are handled by each postprocessing step
56  individually. Returns a list of executed postprocessing steps."""
57  # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
58  # be removed?
59  dsName = taskDict['DSNAME']
60  taskName = taskDict['TASKNAME']
61  prePostProcStatus = taskDict['STATUS']
62  executedSteps = []
63 
64  # Check if there's anything to do
65  if not postprocSteps:
66  return [] # nothing to do
67 
68  #update again from DataBase
69  prePostProcStatus = taskman.getStatus(dsName,taskName)
70  # Don't run postprocessing if status is already postprocessing
71  if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
72  print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
73  return []
74 
75  # Start postprocessing
76  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
77  if jobName:
78  print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
79  else:
80  print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81 
82  # Get list of postprocessing status files that we may have to remove later
83  if jobName:
84  postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
85  else:
86  postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
87 
88  # Do each specified processing step. Postprocessing may be interrupted either deliberately
89  # by a postprocessing step or due to errors.
90  try:
91  for step in postprocSteps:
92  executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
93 
94  except PostponeProcessing as e:
95  # Stop postprocessing chain w/o error. New status will be determined below if not
96  # specified in the exception.
97  if e.newStatus:
98  taskman.setStatus(dsName,taskName,e.newStatus)
99  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
100  return executedSteps
101 
102  except PostProcessingError as e:
103  print (e)
104  if e.newStatus:
105  taskman.setStatus(dsName,taskName,e.newStatus)
106  print ('Executed steps: ',e.executedSteps)
107  return e.executedSteps
108  else:
109  if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
110  # Not all the jobs have run, so ignore error and postpone to later
111  pass
112 
113  else:
114  # All the jobs have run, so nothing new in the future
115  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
116  print ('Executed steps: ',e.executedSteps)
117  return e.executedSteps
118 
119  except Exception as e:
120  # Any other postprocessing error. Task status becomes POSTPROCFAILED.
121  print (e)
122  taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
123  print ('Executed steps: ',executedSteps)
124  return executedSteps
125 
126  else:
127  # After successful postprocessing, delete POSTPROCESSING status files
128  # and mark corresponding jobs as completed
129  for p in postprocStamps:
130  os.system('rm -f %s' % p)
131  basename = p[:-15]
132  os.system('touch %s.COMPLETED' % basename)
133 
134  # Redetermine current status from files on disk
135  a = TaskAnalyzer('.',dsName,taskName)
136  if a.analyzeFiles():
137  a.updateStatus(taskman)
138  else:
139  # Postprocessing could have deleted the task and migrated it to tape
140  taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
141 
142  print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
143  return executedSteps
144 
145 
147 
148  def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
149  self.stepName = self.__class__.__name__
150  self.executedSteps = previousSteps
151  self.executedSteps.append( self.stepName )
152  self.taskman = taskman
153  self.taskDict = taskDict
154  self.dsName = taskDict['DSNAME']
155  self.taskName = taskDict['TASKNAME']
156  self.taskDir = '/'.join([self.dsName,self.taskName])
157  self.oldStatus = oldStatus
158  self.postprocLib = postprocLib
159  self.jobName = jobName
160  self.baseName = '%s-%s' % (self.dsName,self.taskName)
161  self.baseNameStep = '%s-%s.%s' % (self.dsName,self.taskName,self.stepName)
162  self.logfile = open('%s/%s.log' % (self.taskDir,self.baseNameStep), 'w')
163  self.log('Starting postprocessing: %s' % self.__class__.__name__)
164  self.log('* Data set name = %s' % self.dsName, noTime=True)
165  self.log('* Task name = %s' % self.taskName, noTime=True)
166  self.log('* Old status = %s' % getKey(TaskManager.StatusCodes,self.oldStatus), noTime=True)
167  self.log('* Job name = %s' % self.jobName, noTime=True)
168  self.log('* User name = %s' % getUserName(), noTime=True)
169  self.log('* Host name = %s\n' % os.uname()[1], noTime=True)
170  pass
171 
172  def __del__(self):
173  self.log('Done')
174  self.logfile.close()
175 
176  def log(self,msg=None,text=None,noTime=False,doPrint=False):
177  outputFiles = [self.logfile]
178  if doPrint:
179  outputFiles.append(sys.stdout)
180  for out in outputFiles:
181  if msg:
182  if noTime:
183  out.write(' '*31)
184  out.write(msg)
185  out.write('\n')
186  else:
187  out.write('\n%-30s %s\n' % (time.strftime('%a %b %d %X %Z %Y'),msg))
188  if text:
189  #out.write('\n')
190  out.write(text)
191  out.write('\n')
192 
193  def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
194  (status,output) = subprocess.getstatusoutput(cmd)
195  status = status >> 8 # Convert to standard Unix exit code
196  self.log('Executing: %s' % cmd, output)
197  if doPrint or status:
198  print (output)
199  print()
200  if status and checkStatusCode:
201  if not errorMsg:
202  errorMsg = 'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.stepName,cmd)
203  self.log(text='\nERROR: exit status = %i' % (status))
204  self.log(text=errorMsg)
205  if abortOnError:
206  raise PostProcessingError(errorMsg,self.executedSteps)
207  return status
208 
209  def getFileName(self,postFix,stepName='',absPath=False):
210  if stepName:
211  name = '%s.%s%s' % (self.baseName,stepName,postFix)
212  else:
213  name = self.baseNameStep+postFix
214  if absPath:
215  name = '%s/%s' % (self.taskDir,name)
216  return name
217 
218  def jobList(self):
219  try:
220  l = os.listdir(self.taskDir)
221  except Exception:
222  l = []
223  return l
224 
225  def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
226  """Get list of files with specific names from all jobs in a given state."""
227  oldDir = os.getcwd()
228  os.chdir(self.taskDir)
229  if jobName:
230  statusFiles = glob.glob('%s/*.status.%s' % (jobName,statusName))
231  else:
232  statusFiles = glob.glob('*/*.status.%s' % (statusName))
233  files = []
234  for s in statusFiles:
235  b = '/'.join(s.split('/')[:-1])
236  files.extend(glob.glob('%s/%s' % (b,fileNamePattern)))
237  os.chdir(oldDir)
238  return sorted(files)
239 
240  def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
241  """Execute staged hadd for many files."""
242  if len(inputFiles)<=maxFilesPerHadd:
243  self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(inputFiles)))
244  else:
245  nSteps = int(math.ceil(float(len(inputFiles))/maxFilesPerHadd))
246  partFiles = []
247  for index in range(nSteps):
248  minIndex = index*maxFilesPerHadd
249  maxIndex = min((index+1)*maxFilesPerHadd,len(inputFiles))
250  stepInputFiles = inputFiles[minIndex:maxIndex]
251  stepOutputFileName = '%s.part%i' % (outputFileName,index)
252  self.logExec('cd %s; hadd -f %s %s' % (dir,stepOutputFileName,' '.join(stepInputFiles)))
253  partFiles.append(stepOutputFileName)
254  self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(partFiles)))
255  self.logExec('cd %s; rm -f %s' % (dir,' '.join(partFiles)))
256 
257  def getJobConfig(self,jobName=None):
258  config = {}
259  if not jobName:
260  jobName = '*'
261  configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py.final.py'))
262  if not configFile:
263  configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py'))
264  if configFile:
265  exec(open(configFile[0],'r'),config) # Eval config file and put defs into config dict
266  return config['jobConfig']
267 
268  def addResult(self,resultFileName):
269  # FIXME: strip leading dirs from resultFilename, so I can pass an absolute file name?
270  if not resultFileName:
271  return
272  resultFiles = self.taskDict['RESULTFILES']
273  if resultFiles is None:
274  resultFiles = ''
275  if resultFileName not in resultFiles.split():
276  resultFiles = ' '.join([resultFiles,resultFileName])
277  self.taskman.setValue(self.dsName,self.taskName,'RESULTFILES',resultFiles)
278  self.taskDict['RESULTFILES'] = resultFiles
python.PostProcessing.PostProcessingStep.getFileName
def getFileName(self, postFix, stepName='', absPath=False)
Definition: PostProcessing.py:209
python.PostProcessing.PostProcessingStep.addResult
def addResult(self, resultFileName)
Definition: PostProcessing.py:268
python.PostProcessing.PostProcessingStep.baseNameStep
baseNameStep
Definition: PostProcessing.py:161
python.PostProcessing.PostProcessingError.newStatus
newStatus
Definition: PostProcessing.py:27
python.PostProcessing.PostProcessingStep.taskFileList
def taskFileList(self, fileNamePattern, statusName='POSTPROCESSING', jobName=None)
Definition: PostProcessing.py:225
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.PostProcessing.PostProcessingStep.stepName
stepName
Definition: PostProcessing.py:149
python.PostProcessing.PostProcessingStep.taskDict
taskDict
Definition: PostProcessing.py:153
python.PostProcessing.PostProcessingStep.__init__
def __init__(self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None)
Definition: PostProcessing.py:148
python.PostProcessing.PostProcessingStep.logExec
def logExec(self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True)
Definition: PostProcessing.py:193
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.PostProcessing.PostProcessingStep.oldStatus
oldStatus
Definition: PostProcessing.py:157
TaskManager
python.PostProcessing.PostProcessingStep.dsName
dsName
Definition: PostProcessing.py:154
python.PostProcessing.PostponeProcessing.__str__
def __str__(self)
Definition: PostProcessing.py:36
python.PostProcessing.PostProcessingError.__str__
def __str__(self)
Definition: PostProcessing.py:28
python.PostProcessing.PostProcessingError.message
message
Definition: PostProcessing.py:25
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
python.PostProcessing.PostProcessingStep.__del__
def __del__(self)
Definition: PostProcessing.py:172
python.PostProcessing.PostponeProcessing
Definition: PostProcessing.py:31
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.PostProcessing.PostProcessingStep.taskDir
taskDir
Definition: PostProcessing.py:156
python.PostProcessing.PostponeProcessing.__init__
def __init__(self, message, executedSteps, newStatus=None)
Definition: PostProcessing.py:33
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
min
#define min(a, b)
Definition: cfImp.cxx:40
python.PostProcessing.PostponeProcessing.message
message
Definition: PostProcessing.py:34
python.PostProcessing.PostProcessingStep
Definition: PostProcessing.py:146
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.PostProcessing.PostProcessingStep.jobName
jobName
Definition: PostProcessing.py:159
Trk::open
@ open
Definition: BinningType.h:40
python.PostProcessing.PostProcessingStep.postprocLib
postprocLib
Definition: PostProcessing.py:158
python.PostProcessing.doPostProcessing
def doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)
Definition: PostProcessing.py:53
python.PostProcessing.runPostProcStep
def runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
Definition: PostProcessing.py:40
python.PostProcessing.PostProcessingStep.log
def log(self, msg=None, text=None, noTime=False, doPrint=False)
Definition: PostProcessing.py:176
python.PostProcessing.PostProcessingStep.taskman
taskman
Definition: PostProcessing.py:152
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
python.PostProcessing.PostProcessingStep.taskName
taskName
Definition: PostProcessing.py:155
python.PostProcessing.PostProcessingStep.logfile
logfile
Definition: PostProcessing.py:162
python.PostProcessing.PostProcessingStep.hadd
def hadd(self, dir, outputFileName, inputFiles, maxFilesPerHadd=100)
Definition: PostProcessing.py:240
python.PostProcessing.PostProcessingError.executedSteps
executedSteps
Definition: PostProcessing.py:26
python.PostProcessing.PostProcessingError
Definition: PostProcessing.py:22
python.PostProcessing.PostProcessingStep.baseName
baseName
Definition: PostProcessing.py:160
pool::getKey
std::string getKey(const std::string &key, const std::string &encoded)
python.PostProcessing.PostponeProcessing.newStatus
newStatus
Definition: PostProcessing.py:35
python.PostProcessing.PostProcessingError.__init__
def __init__(self, message, executedSteps, newStatus=None)
Definition: PostProcessing.py:24
python.PostProcessing.PostProcessingStep.executedSteps
executedSteps
Definition: PostProcessing.py:150
python.PostProcessing.PostProcessingStep.getJobConfig
def getJobConfig(self, jobName=None)
Definition: PostProcessing.py:257
readCCLHist.float
float
Definition: readCCLHist.py:83
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48
python.PostProcessing.PostProcessingStep.jobList
def jobList(self)
Definition: PostProcessing.py:218