An hadd iteration - takes a bunch of input files and produces a bunch of output files
Definition at line 85 of file rhadd.py.
◆ __init__()
def rhadd.haddStep.__init__ |
( |
|
self, |
|
|
|
inputFiles, |
|
|
|
bunchNumber, |
|
|
|
finalOutputFile, |
|
|
|
level = None |
|
) |
| |
Definition at line 87 of file rhadd.py.
87 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
88 self._inputFiles = inputFiles
89 self._bunchNumber = bunchNumber
90 self._finalOutputFile = finalOutputFile
91 self._haddJobArray = []
93 self._defineMergeJobs()
◆ __str__()
def rhadd.haddStep.__str__ |
( |
|
self | ) |
|
Definition at line 159 of file rhadd.py.
160 return 'Merging level %s: %s' % (self._level,
str([
str(job)
for job
in self._haddJobArray ]))
◆ _defineMergeJobs()
def rhadd.haddStep._defineMergeJobs |
( |
|
self | ) |
|
|
private |
Definition at line 96 of file rhadd.py.
96 def _defineMergeJobs(self):
98 nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
99 logging.debug(
'Need %d merges for level %d' % (nMerges, self._level))
101 logging.debug(
'Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
102 self._haddJobArray.
append(haddJob(self._inputFiles, self._finalOutputFile))
107 for job
in range(nMerges):
109 fileCounter = len(self._inputFiles) *
float(job+1) / nMerges
111 lastFile =
int(fileCounter + 0.5)
112 tempOutput = mkstemp(dir=
'.', prefix=
'tmp.')
113 os.close(tempOutput[0])
114 logging.debug(
'Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
115 self._haddJobArray.
append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
◆ executeAll()
def rhadd.haddStep.executeAll |
( |
|
self, |
|
|
|
parallel = 1 |
|
) |
| |
Definition at line 119 of file rhadd.py.
119 def executeAll(self, parallel = 1):
122 logging.info(
'Starting merge using up to %d hadd processes in parallel' % parallel)
123 logging.warning(
'Parallel merging is experimental')
124 pool = Pool(processes = parallel)
125 parallelResultsArray = []
126 for job
in self._haddJobArray:
127 parallelResultsArray.append(pool.apply_async(job, ()))
135 for i, job
in enumerate(self._haddJobArray):
136 job.exitCode = parallelResultsArray[i].
get(timeout=0)
138 for job
in self._haddJobArray:
139 if job.exitCode != 0:
140 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
144 for job
in self._haddJobArray:
146 if job.exitCode != 0:
147 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
◆ numHadds()
def rhadd.haddStep.numHadds |
( |
|
self | ) |
|
Definition at line 156 of file rhadd.py.
157 return len(self._haddJobArray)
◆ outputFiles()
def rhadd.haddStep.outputFiles |
( |
|
self | ) |
|
Definition at line 152 of file rhadd.py.
152 def outputFiles(self):
153 return [ job.outputFile
for job
in self._haddJobArray ]
◆ _bunchNumber
rhadd.haddStep._bunchNumber |
|
private |
◆ _finalOutputFile
rhadd.haddStep._finalOutputFile |
|
private |
◆ _haddJobArray
rhadd.haddStep._haddJobArray |
|
private |
◆ _inputFiles
rhadd.haddStep._inputFiles |
|
private |
◆ _level
The documentation for this class was generated from the following file: