|
| def | __init__ (self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None) |
| |
| def | __del__ (self) |
| |
| def | log (self, msg=None, text=None, noTime=False, doPrint=False) |
| |
| def | logExec (self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True) |
| |
| def | getFileName (self, postFix, stepName='', absPath=False) |
| |
| def | jobList (self) |
| |
| def | taskFileList (self, fileNamePattern, statusName='POSTPROCESSING', jobName=None) |
| |
| def | hadd (self, dir, outputFileName, inputFiles, maxFilesPerHadd=100) |
| |
| def | getJobConfig (self, jobName=None) |
| |
| def | addResult (self, resultFileName) |
| |
Definition at line 148 of file PostProcessing.py.
◆ __init__()
| def python.PostProcessing.PostProcessingStep.__init__ |
( |
|
self, |
|
|
|
taskman, |
|
|
|
taskDict, |
|
|
|
oldStatus, |
|
|
|
previousSteps, |
|
|
|
postprocLib, |
|
|
|
jobName = None |
|
) |
| |
Definition at line 150 of file PostProcessing.py.
150 def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
151 self.stepName = self.__class__.__name__
152 self.executedSteps = previousSteps
153 self.executedSteps.
append( self.stepName )
154 self.taskman = taskman
155 self.taskDict = taskDict
156 self.dsName = taskDict[
'DSNAME']
157 self.taskName = taskDict[
'TASKNAME']
158 self.taskDir =
'/'.
join([self.dsName,self.taskName])
159 self.oldStatus = oldStatus
160 self.postprocLib = postprocLib
161 self.jobName = jobName
162 self.baseName =
'%s-%s' % (self.dsName,self.taskName)
163 self.baseNameStep =
'%s-%s.%s' % (self.dsName,self.taskName,self.stepName)
164 self.logfile =
open(
'%s/%s.log' % (self.taskDir,self.baseNameStep),
'w')
165 self.log(
'Starting postprocessing: %s' % self.__class__.__name__)
166 self.log(
'* Data set name = %s' % self.dsName, noTime=
True)
167 self.log(
'* Task name = %s' % self.taskName, noTime=
True)
168 self.log(
'* Old status = %s' %
getKey(TaskManager.StatusCodes,self.oldStatus), noTime=
True)
169 self.log(
'* Job name = %s' % self.jobName, noTime=
True)
170 self.log(
'* User name = %s' %
getUserName(), noTime=
True)
171 self.log(
'* Host name = %s\n' % os.uname()[1], noTime=
True)
◆ __del__()
| def python.PostProcessing.PostProcessingStep.__del__ |
( |
|
self | ) |
|
◆ addResult()
| def python.PostProcessing.PostProcessingStep.addResult |
( |
|
self, |
|
|
|
resultFileName |
|
) |
| |
Definition at line 270 of file PostProcessing.py.
270 def addResult(self,resultFileName):
272 if not resultFileName:
274 resultFiles = self.taskDict[
'RESULTFILES']
275 if resultFiles
is None:
277 if resultFileName
not in resultFiles.split():
278 resultFiles =
' '.
join([resultFiles,resultFileName])
279 self.taskman.setValue(self.dsName,self.taskName,
'RESULTFILES',resultFiles)
280 self.taskDict[
'RESULTFILES'] = resultFiles
◆ getFileName()
| def python.PostProcessing.PostProcessingStep.getFileName |
( |
|
self, |
|
|
|
postFix, |
|
|
|
stepName = '', |
|
|
|
absPath = False |
|
) |
| |
Definition at line 211 of file PostProcessing.py.
211 def getFileName(self,postFix,stepName='',absPath=False):
213 name =
'%s.%s%s' % (self.baseName,stepName,postFix)
215 name = self.baseNameStep+postFix
217 name =
'%s/%s' % (self.taskDir,name)
◆ getJobConfig()
| def python.PostProcessing.PostProcessingStep.getJobConfig |
( |
|
self, |
|
|
|
jobName = None |
|
) |
| |
Definition at line 259 of file PostProcessing.py.
263 configFile = glob.glob(
'%s/%s/%s' % (self.taskDir,jobName,
'*.config.py.final.py'))
265 configFile = glob.glob(
'%s/%s/%s' % (self.taskDir,jobName,
'*.config.py'))
267 exec(
open(configFile[0],
'r'),config)
268 return config[
'jobConfig']
◆ hadd()
| def python.PostProcessing.PostProcessingStep.hadd |
( |
|
self, |
|
|
|
dir, |
|
|
|
outputFileName, |
|
|
|
inputFiles, |
|
|
|
maxFilesPerHadd = 100 |
|
) |
| |
Execute staged hadd for many files.
Definition at line 242 of file PostProcessing.py.
242 def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
243 """Execute staged hadd for many files."""
244 if len(inputFiles)<=maxFilesPerHadd:
245 self.logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(inputFiles)))
247 nSteps =
int(math.ceil(
float(len(inputFiles))/maxFilesPerHadd))
249 for index
in range(nSteps):
250 minIndex = index*maxFilesPerHadd
251 maxIndex =
min((index+1)*maxFilesPerHadd,len(inputFiles))
252 stepInputFiles = inputFiles[minIndex:maxIndex]
253 stepOutputFileName =
'%s.part%i' % (outputFileName,index)
254 self.logExec(
'cd %s; hadd -f %s %s' % (dir,stepOutputFileName,
' '.
join(stepInputFiles)))
255 partFiles.append(stepOutputFileName)
256 self.logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(partFiles)))
257 self.logExec(
'cd %s; rm -f %s' % (dir,
' '.
join(partFiles)))
◆ jobList()
| def python.PostProcessing.PostProcessingStep.jobList |
( |
|
self | ) |
|
◆ log()
| def python.PostProcessing.PostProcessingStep.log |
( |
|
self, |
|
|
|
msg = None, |
|
|
|
text = None, |
|
|
|
noTime = False, |
|
|
|
doPrint = False |
|
) |
| |
Definition at line 178 of file PostProcessing.py.
178 def log(self,msg=None,text=None,noTime=False,doPrint=False):
179 outputFiles = [self.logfile]
181 outputFiles.append(sys.stdout)
182 for out
in outputFiles:
189 out.write(
'\n%-30s %s\n' % (time.strftime(
'%a %b %d %X %Z %Y'),msg))
◆ logExec()
| def python.PostProcessing.PostProcessingStep.logExec |
( |
|
self, |
|
|
|
cmd, |
|
|
|
doPrint = False, |
|
|
|
checkStatusCode = True, |
|
|
|
errorMsg = '', |
|
|
|
abortOnError = True |
|
) |
| |
Definition at line 195 of file PostProcessing.py.
195 def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
196 (status,output) = subprocess.getstatusoutput(cmd)
198 self.log(
'Executing: %s' % cmd, output)
199 if doPrint
or status:
202 if status
and checkStatusCode:
204 errorMsg =
'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.stepName,cmd)
205 self.log(text=
'\nERROR: exit status = %i' % (status))
206 self.log(text=errorMsg)
208 raise PostProcessingError(errorMsg,self.executedSteps)
◆ taskFileList()
| def python.PostProcessing.PostProcessingStep.taskFileList |
( |
|
self, |
|
|
|
fileNamePattern, |
|
|
|
statusName = 'POSTPROCESSING', |
|
|
|
jobName = None |
|
) |
| |
Get list of files with specific names from all jobs in a given state.
Definition at line 227 of file PostProcessing.py.
227 def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
228 """Get list of files with specific names from all jobs in a given state."""
230 os.chdir(self.taskDir)
232 statusFiles = glob.glob(
'%s/*.status.%s' % (jobName,statusName))
234 statusFiles = glob.glob(
'*/*.status.%s' % (statusName))
236 for s
in statusFiles:
237 b =
'/'.
join(s.split(
'/')[:-1])
238 files.extend(glob.glob(
'%s/%s' % (b,fileNamePattern)))
◆ baseName
| python.PostProcessing.PostProcessingStep.baseName |
◆ baseNameStep
| python.PostProcessing.PostProcessingStep.baseNameStep |
◆ dsName
| python.PostProcessing.PostProcessingStep.dsName |
◆ executedSteps
| python.PostProcessing.PostProcessingStep.executedSteps |
◆ jobName
| python.PostProcessing.PostProcessingStep.jobName |
◆ logfile
| python.PostProcessing.PostProcessingStep.logfile |
◆ oldStatus
| python.PostProcessing.PostProcessingStep.oldStatus |
◆ postprocLib
| python.PostProcessing.PostProcessingStep.postprocLib |
◆ stepName
| python.PostProcessing.PostProcessingStep.stepName |
◆ taskDict
| python.PostProcessing.PostProcessingStep.taskDict |
◆ taskDir
| python.PostProcessing.PostProcessingStep.taskDir |
◆ taskman
| python.PostProcessing.PostProcessingStep.taskman |
◆ taskName
| python.PostProcessing.PostProcessingStep.taskName |
The documentation for this class was generated from the following file:
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...