|
def | __init__ (self, taskman, taskDict, oldStatus, previousSteps, postprocLib, jobName=None) |
|
def | __del__ (self) |
|
def | log (self, msg=None, text=None, noTime=False, doPrint=False) |
|
def | logExec (self, cmd, doPrint=False, checkStatusCode=True, errorMsg='', abortOnError=True) |
|
def | getFileName (self, postFix, stepName='', absPath=False) |
|
def | jobList (self) |
|
def | taskFileList (self, fileNamePattern, statusName='POSTPROCESSING', jobName=None) |
|
def | hadd (self, dir, outputFileName, inputFiles, maxFilesPerHadd=100) |
|
def | getJobConfig (self, jobName=None) |
|
def | addResult (self, resultFileName) |
|
Definition at line 146 of file PostProcessing.py.
◆ __init__()
def python.PostProcessing.PostProcessingStep.__init__ |
( |
|
self, |
|
|
|
taskman, |
|
|
|
taskDict, |
|
|
|
oldStatus, |
|
|
|
previousSteps, |
|
|
|
postprocLib, |
|
|
|
jobName = None |
|
) |
| |
Definition at line 148 of file PostProcessing.py.
148 def __init__(self,taskman,taskDict,oldStatus,previousSteps,postprocLib,jobName=None):
149 self.stepName = self.__class__.__name__
150 self.executedSteps = previousSteps
151 self.executedSteps.
append( self.stepName )
152 self.taskman = taskman
153 self.taskDict = taskDict
154 self.dsName = taskDict[
'DSNAME']
155 self.taskName = taskDict[
'TASKNAME']
156 self.taskDir =
'/'.
join([self.dsName,self.taskName])
157 self.oldStatus = oldStatus
158 self.postprocLib = postprocLib
159 self.jobName = jobName
160 self.baseName =
'%s-%s' % (self.dsName,self.taskName)
161 self.baseNameStep =
'%s-%s.%s' % (self.dsName,self.taskName,self.stepName)
162 self.logfile =
open(
'%s/%s.log' % (self.taskDir,self.baseNameStep),
'w')
163 self.log(
'Starting postprocessing: %s' % self.__class__.__name__)
164 self.log(
'* Data set name = %s' % self.dsName, noTime=
True)
165 self.log(
'* Task name = %s' % self.taskName, noTime=
True)
166 self.log(
'* Old status = %s' %
getKey(TaskManager.StatusCodes,self.oldStatus), noTime=
True)
167 self.log(
'* Job name = %s' % self.jobName, noTime=
True)
168 self.log(
'* User name = %s' %
getUserName(), noTime=
True)
169 self.log(
'* Host name = %s\n' % os.uname()[1], noTime=
True)
◆ __del__()
def python.PostProcessing.PostProcessingStep.__del__ |
( |
|
self | ) |
|
◆ addResult()
def python.PostProcessing.PostProcessingStep.addResult |
( |
|
self, |
|
|
|
resultFileName |
|
) |
| |
Definition at line 268 of file PostProcessing.py.
268 def addResult(self,resultFileName):
270 if not resultFileName:
272 resultFiles = self.taskDict[
'RESULTFILES']
273 if resultFiles
is None:
275 if resultFileName
not in resultFiles.split():
276 resultFiles =
' '.
join([resultFiles,resultFileName])
277 self.taskman.setValue(self.dsName,self.taskName,
'RESULTFILES',resultFiles)
278 self.taskDict[
'RESULTFILES'] = resultFiles
◆ getFileName()
def python.PostProcessing.PostProcessingStep.getFileName |
( |
|
self, |
|
|
|
postFix, |
|
|
|
stepName = '' , |
|
|
|
absPath = False |
|
) |
| |
Definition at line 209 of file PostProcessing.py.
209 def getFileName(self,postFix,stepName='',absPath=False):
211 name =
'%s.%s%s' % (self.baseName,stepName,postFix)
213 name = self.baseNameStep+postFix
215 name =
'%s/%s' % (self.taskDir,name)
◆ getJobConfig()
def python.PostProcessing.PostProcessingStep.getJobConfig |
( |
|
self, |
|
|
|
jobName = None |
|
) |
| |
Definition at line 257 of file PostProcessing.py.
261 configFile = glob.glob(
'%s/%s/%s' % (self.taskDir,jobName,
'*.config.py.final.py'))
263 configFile = glob.glob(
'%s/%s/%s' % (self.taskDir,jobName,
'*.config.py'))
265 exec(
open(configFile[0],
'r'),config)
266 return config[
'jobConfig']
◆ hadd()
def python.PostProcessing.PostProcessingStep.hadd |
( |
|
self, |
|
|
|
dir, |
|
|
|
outputFileName, |
|
|
|
inputFiles, |
|
|
|
maxFilesPerHadd = 100 |
|
) |
| |
Execute staged hadd for many files.
Definition at line 240 of file PostProcessing.py.
240 def hadd(self,dir,outputFileName,inputFiles,maxFilesPerHadd=100):
241 """Execute staged hadd for many files."""
242 if len(inputFiles)<=maxFilesPerHadd:
243 self.logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(inputFiles)))
245 nSteps =
int(math.ceil(
float(len(inputFiles))/maxFilesPerHadd))
247 for index
in range(nSteps):
248 minIndex = index*maxFilesPerHadd
249 maxIndex =
min((index+1)*maxFilesPerHadd,len(inputFiles))
250 stepInputFiles = inputFiles[minIndex:maxIndex]
251 stepOutputFileName =
'%s.part%i' % (outputFileName,index)
252 self.logExec(
'cd %s; hadd -f %s %s' % (dir,stepOutputFileName,
' '.
join(stepInputFiles)))
253 partFiles.append(stepOutputFileName)
254 self.logExec(
'cd %s; hadd -f %s %s' % (dir,outputFileName,
' '.
join(partFiles)))
255 self.logExec(
'cd %s; rm -f %s' % (dir,
' '.
join(partFiles)))
◆ jobList()
def python.PostProcessing.PostProcessingStep.jobList |
( |
|
self | ) |
|
◆ log()
def python.PostProcessing.PostProcessingStep.log |
( |
|
self, |
|
|
|
msg = None , |
|
|
|
text = None , |
|
|
|
noTime = False , |
|
|
|
doPrint = False |
|
) |
| |
Definition at line 176 of file PostProcessing.py.
176 def log(self,msg=None,text=None,noTime=False,doPrint=False):
177 outputFiles = [self.logfile]
179 outputFiles.append(sys.stdout)
180 for out
in outputFiles:
187 out.write(
'\n%-30s %s\n' % (time.strftime(
'%a %b %d %X %Z %Y'),msg))
◆ logExec()
def python.PostProcessing.PostProcessingStep.logExec |
( |
|
self, |
|
|
|
cmd, |
|
|
|
doPrint = False , |
|
|
|
checkStatusCode = True , |
|
|
|
errorMsg = '' , |
|
|
|
abortOnError = True |
|
) |
| |
Definition at line 193 of file PostProcessing.py.
193 def logExec(self,cmd,doPrint=False,checkStatusCode=True,errorMsg='',abortOnError=True):
194 (status,output) = subprocess.getstatusoutput(cmd)
196 self.log(
'Executing: %s' % cmd, output)
197 if doPrint
or status:
200 if status
and checkStatusCode:
202 errorMsg =
'ERROR in postprocessing step %s while executing:\n\n%s\n' % (self.stepName,cmd)
203 self.log(text=
'\nERROR: exit status = %i' % (status))
204 self.log(text=errorMsg)
206 raise PostProcessingError(errorMsg,self.executedSteps)
◆ taskFileList()
def python.PostProcessing.PostProcessingStep.taskFileList |
( |
|
self, |
|
|
|
fileNamePattern, |
|
|
|
statusName = 'POSTPROCESSING' , |
|
|
|
jobName = None |
|
) |
| |
Get list of files with specific names from all jobs in a given state.
Definition at line 225 of file PostProcessing.py.
225 def taskFileList(self,fileNamePattern,statusName='POSTPROCESSING',jobName=None):
226 """Get list of files with specific names from all jobs in a given state."""
228 os.chdir(self.taskDir)
230 statusFiles = glob.glob(
'%s/*.status.%s' % (jobName,statusName))
232 statusFiles = glob.glob(
'*/*.status.%s' % (statusName))
234 for s
in statusFiles:
235 b =
'/'.
join(s.split(
'/')[:-1])
236 files.extend(glob.glob(
'%s/%s' % (b,fileNamePattern)))
◆ baseName
python.PostProcessing.PostProcessingStep.baseName |
◆ baseNameStep
python.PostProcessing.PostProcessingStep.baseNameStep |
◆ dsName
python.PostProcessing.PostProcessingStep.dsName |
◆ executedSteps
python.PostProcessing.PostProcessingStep.executedSteps |
◆ jobName
python.PostProcessing.PostProcessingStep.jobName |
◆ logfile
python.PostProcessing.PostProcessingStep.logfile |
◆ oldStatus
python.PostProcessing.PostProcessingStep.oldStatus |
◆ postprocLib
python.PostProcessing.PostProcessingStep.postprocLib |
◆ stepName
python.PostProcessing.PostProcessingStep.stepName |
◆ taskDict
python.PostProcessing.PostProcessingStep.taskDict |
◆ taskDir
python.PostProcessing.PostProcessingStep.taskDir |
◆ taskman
python.PostProcessing.PostProcessingStep.taskman |
◆ taskName
python.PostProcessing.PostProcessingStep.taskName |
The documentation for this class was generated from the following file:
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...