ATLAS Offline Software
Loading...
Searching...
No Matches
PostProcessing.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4
5"""
6This module defines the generic infrastructure for task postprocessing.
7"""
8__author__ = 'Juerg Beringer'
9__version__ = '$Id $'
10
11
12import glob, time, sys, os, math
13import subprocess
14
15from InDetBeamSpotExample.TaskManager import TaskAnalyzer, TaskManager, getKey
16from InDetBeamSpotExample.Utils import getUserName
17
18
19
20
21# Exception classes
22class PostProcessingError(Exception):
23 """Generic postprocessing exception class."""
24 def __init__(self,message,executedSteps,newStatus=None):
25 self.message = message
26 self.executedSteps = executedSteps
27 self.newStatus = newStatus
28 super().__init__(message,executedSteps,newStatus)
29 def __str__(self):
30 return self.message
31
32class PostponeProcessing(Exception):
33 """Exception used to postpone postprocessing."""
34 def __init__(self,message,executedSteps,newStatus=None):
35 self.message = message
36 self.newStatus = newStatus
37 super().__init__(message,executedSteps,newStatus)
38 def __str__(self):
39 return self.message
40
41
42def runPostProcStep(taskman,taskDict,oldStatus,previousSteps,step,postprocLib,jobName):
43 """Instantiate and run a single postprocessing step."""
44 if hasattr(postprocLib,step):
45 print ('...',step,'\n')
46 # First reload taskDict - a previous step might have updated the info
47 taskDict = taskman.getTaskDict(taskDict['DSNAME'],taskDict['TASKNAME'])
48 postprocClass = getattr(postprocLib,step)(taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName)
49 postprocClass.run()
50 return postprocClass.executedSteps
51 else:
52 raise PostProcessingError('ERROR: undefined postprocessing step: '+step,previousSteps,TaskManager.StatusCodes['POSTPROCFAILED'])
53
54
55def doPostProcessing(taskman,taskDict,postprocSteps,postprocLib,forceRun=False,jobName=None):
56 """Execute a series of postprocessing steps. The task status is updated before
57 and after the postprocessing. Log files are handled by each postprocessing step
58 individually. Returns a list of executed postprocessing steps."""
59 # NOTE: At present, we don't use per-job postprocessing steps. Should this feature
60 # be removed?
61 dsName = taskDict['DSNAME']
62 taskName = taskDict['TASKNAME']
63 prePostProcStatus = taskDict['STATUS']
64 executedSteps = []
65
66 # Check if there's anything to do
67 if not postprocSteps:
68 return [] # nothing to do
69
70 #update again from DataBase
71 prePostProcStatus = taskman.getStatus(dsName,taskName)
72 # Don't run postprocessing if status is already postprocessing
73 if prePostProcStatus>=TaskManager.StatusCodes['POSTPROCRUNNING'] and not forceRun:
74 print ('Exiting postprocessing without doing anything: task %s/%s status is %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
75 return []
76
77 # Start postprocessing
78 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCRUNNING'])
79 if jobName:
80 print ('Postprocessing for task %s/%s - job %s\nOld status: %s\n' % (dsName,taskName,jobName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
81 else:
82 print ('Postprocessing for task %s/%s\nOld status: %s\n' % (dsName,taskName,getKey(TaskManager.StatusCodes,prePostProcStatus)))
83
84 # Get list of postprocessing status files that we may have to remove later
85 if jobName:
86 postprocStamps = glob.glob('%s/%s/%s/*.status.POSTPROCESSING' % (dsName,taskName,jobName))
87 else:
88 postprocStamps = glob.glob('%s/%s/*/*.status.POSTPROCESSING' % (dsName,taskName))
89
90 # Do each specified processing step. Postprocessing may be interrupted either deliberately
91 # by a postprocessing step or due to errors.
92 try:
93 for step in postprocSteps:
94 executedSteps = runPostProcStep(taskman,taskDict,prePostProcStatus,executedSteps,step,postprocLib,jobName)
95
96 except PostponeProcessing as e:
97 # Stop postprocessing chain w/o error. New status will be determined below if not
98 # specified in the exception.
99 if e.newStatus:
100 taskman.setStatus(dsName,taskName,e.newStatus)
101 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
102 return executedSteps
103
104 except PostProcessingError as e:
105 print (e)
106 if e.newStatus:
107 taskman.setStatus(dsName,taskName,e.newStatus)
108 print ('Executed steps: ',e.executedSteps)
109 return e.executedSteps
110 else:
111 if (taskDict['NJOBS_SUBMITTED']+taskDict['NJOBS_RUNNING']) > 0:
112 # Not all the jobs have run, so ignore error and postpone to later
113 pass
114
115 else:
116 # All the jobs have run, so nothing new in the future
117 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
118 print ('Executed steps: ',e.executedSteps)
119 return e.executedSteps
120
121 except Exception as e:
122 # Any other postprocessing error. Task status becomes POSTPROCFAILED.
123 print (e)
124 taskman.setStatus(dsName,taskName,TaskManager.StatusCodes['POSTPROCFAILED'])
125 print ('Executed steps: ',executedSteps)
126 return executedSteps
127
128 else:
129 # After successful postprocessing, delete POSTPROCESSING status files
130 # and mark corresponding jobs as completed
131 for p in postprocStamps:
132 os.system('rm -f %s' % p)
133 basename = p[:-15]
134 os.system('touch %s.COMPLETED' % basename)
135
136 # Redetermine current status from files on disk
137 a = TaskAnalyzer('.',dsName,taskName)
138 if a.analyzeFiles():
139 a.updateStatus(taskman)
140 else:
141 # Postprocessing could have deleted the task and migrated it to tape
142 taskman.setDiskStatus(dsName,taskName,TaskManager.OnDiskCodes['DELETED'])
143
144 print ('%i step(s) completed successfully: ' % len(executedSteps),executedSteps,'\n')
145 return executedSteps
146
147
149
150 def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
151 self.stepName = self.__class__.__name__
152 self.executedSteps = previousSteps
153 self.executedSteps.append( self.stepName )
154 self.taskman = taskman
155 self.taskDict = taskDict
156 self.dsName = taskDict['DSNAME']
157 self.taskName = taskDict['TASKNAME']
158 self.taskDir = '/'.join([self.dsName,self.taskName])
159 self.oldStatus = oldStatus
160 self.postprocLib = postprocLib
161 self.jobName = jobName
162 self.baseName = '%s-%s' % (self.dsName,self.taskName)
163 self.baseNameStep = '%s-%s.%s' % (self.dsName,self.taskName,self.stepName)
164 self.logfile = open('%s/%s.log' % (self.taskDir,self.baseNameStep), 'w')
165 self.log('Starting postprocessing: %s' % self.__class__.__name__)
166 self.log('* Data set name = %s' % self.dsName, noTime=True)
167 self.log('* Task name = %s' % self.taskName, noTime=True)
168 self.log('* Old status = %s' % getKey(TaskManager.StatusCodes,self.oldStatus), noTime=True)
169 self.log('* Job name = %s' % self.jobName, noTime=True)
170 self.log('* User name = %s' % getUserName(), noTime=True)
171 self.log('* Host name = %s\n' % os.uname()[1], noTime=True)
172 pass
173
174 def __del__(self):
175 self.log('Done')
176 self.logfile.close()
177
178 def log(self,msg=None,text=None,noTime=False,doPrint=False):
179 outputFiles = [self.logfile]
180 if doPrint:
181 outputFiles.append(sys.stdout)
182 for out in outputFiles:
183 if msg:
184 if noTime:
185 out.write(' '*31)
186 out.write(msg)
187 out.write('\n')
188 else:
189 out.write('\n%-30s %s\n' % (time.strftime('%a %b %d %X %Z %Y'),msg))
190 if text:
191 #out.write('\n')
192 out.write(text)
193 out.write('\n')
194
195 def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
196 (status,output) = subprocess.getstatusoutput(cmd)
197 status = status >> 8 # Convert to standard Unix exit code
198 self.log('Executing: %s' % cmd, output)
199 if doPrint or status:
200 print (output)
201 print()
202 if status and checkStatusCode:
203 if not errorMsg:
204 errorMsg = 'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.stepName,cmd)
205 self.log(text='\nERROR: exit status = %i' % (status))
206 self.log(text=errorMsg)
207 if abortOnError:
208 raise PostProcessingError(errorMsg,self.executedSteps)
209 return status
210
211 def getFileName(self,postFix,stepName='',absPath=False):
212 if stepName:
213 name = '%s.%s%s' % (self.baseName,stepName,postFix)
214 else:
215 name = self.baseNameStep+postFix
216 if absPath:
217 name = '%s/%s' % (self.taskDir,name)
218 return name
219
220 def jobList(self):
221 try:
222 l = os.listdir(self.taskDir)
223 except Exception:
224 l = []
225 return l
226
227 def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
228 """Get list of files with specific names from all jobs in a given state."""
229 oldDir = os.getcwd()
230 os.chdir(self.taskDir)
231 if jobName:
232 statusFiles = glob.glob('%s/*.status.%s' % (jobName,statusName))
233 else:
234 statusFiles = glob.glob('*/*.status.%s' % (statusName))
235 files = []
236 for s in statusFiles:
237 b = '/'.join(s.split('/')[:-1])
238 files.extend(glob.glob('%s/%s' % (b,fileNamePattern)))
239 os.chdir(oldDir)
240 return sorted(files)
241
242 def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
243 """Execute staged hadd for many files."""
244 if len(inputFiles)<=maxFilesPerHadd:
245 self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(inputFiles)))
246 else:
247 nSteps = int(math.ceil(float(len(inputFiles))/maxFilesPerHadd))
248 partFiles = []
249 for index in range(nSteps):
250 minIndex = index*maxFilesPerHadd
251 maxIndex = min((index+1)*maxFilesPerHadd,len(inputFiles))
252 stepInputFiles = inputFiles[minIndex:maxIndex]
253 stepOutputFileName = '%s.part%i' % (outputFileName,index)
254 self.logExec('cd %s; hadd -f %s %s' % (dir,stepOutputFileName,' '.join(stepInputFiles)))
255 partFiles.append(stepOutputFileName)
256 self.logExec('cd %s; hadd -f %s %s' % (dir,outputFileName,' '.join(partFiles)))
257 self.logExec('cd %s; rm -f %s' % (dir,' '.join(partFiles)))
258
259 def getJobConfig(self,jobName=None):
260 config = {}
261 if not jobName:
262 jobName = '*'
263 configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py.final.py'))
264 if not configFile:
265 configFile = glob.glob('%s/%s/%s' % (self.taskDir,jobName,'*.config.py'))
266 if configFile:
267 exec(open(configFile[0],'r'),config) # Eval config file and put defs into config dict
268 return config['jobConfig']
269
270 def addResult(self,resultFileName):
271 # FIXME: strip leading dirs from resultFilename, so I can pass an absolute file name?
272 if not resultFileName:
273 return
274 resultFiles = self.taskDict['RESULTFILES']
275 if resultFiles is None:
276 resultFiles = ''
277 if resultFileName not in resultFiles.split():
278 resultFiles = ' '.join([resultFiles,resultFileName])
279 self.taskman.setValue(self.dsName,self.taskName,'RESULTFILES',resultFiles)
280 self.taskDict['RESULTFILES'] = resultFiles
void print(char *figname, TCanvas *c1)
#define min(a, b)
Definition cfImp.cxx:40
__init__(self, message, executedSteps, newStatus=None)
taskFileList(self, fileNamePattern, statusName='POSTPROCESSING', jobName=None)
log(self, msg=None, text=None, noTime=False, doPrint=False)
__init__(self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None)
getFileName(self, postFix, stepName='', absPath=False)
logExec(self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True)
hadd(self, dir, outputFileName, inputFiles, maxFilesPerHadd=100)
__init__(self, message, executedSteps, newStatus=None)
runPostProcStep(taskman, taskDict, oldStatus, previousSteps, step, postprocLib, jobName)
doPostProcessing(taskman, taskDict, postprocSteps, postprocLib, forceRun=False, jobName=None)