An hadd iteration - takes a bunch of input files and produces a bunch of output files
Definition at line 86 of file rhadd.py.
◆ __init__()
def rhadd.haddStep.__init__ |
( |
|
self, |
|
|
|
inputFiles, |
|
|
|
bunchNumber, |
|
|
|
finalOutputFile, |
|
|
|
level = None |
|
) |
| |
Definition at line 88 of file rhadd.py.
88 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
89 self._inputFiles = inputFiles
90 self._bunchNumber = bunchNumber
91 self._finalOutputFile = finalOutputFile
92 self._haddJobArray = []
94 self._defineMergeJobs()
◆ __str__()
def rhadd.haddStep.__str__ |
( |
|
self | ) |
|
Definition at line 160 of file rhadd.py.
161 return 'Merging level %s: %s' % (self._level,
str([
str(job)
for job
in self._haddJobArray ]))
◆ _defineMergeJobs()
def rhadd.haddStep._defineMergeJobs |
( |
|
self | ) |
|
|
private |
Definition at line 97 of file rhadd.py.
97 def _defineMergeJobs(self):
99 nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
100 logging.debug(
'Need %d merges for level %d' % (nMerges, self._level))
102 logging.debug(
'Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
103 self._haddJobArray.
append(haddJob(self._inputFiles, self._finalOutputFile))
108 for job
in range(nMerges):
110 fileCounter = len(self._inputFiles) *
float(job+1) / nMerges
112 lastFile =
int(fileCounter + 0.5)
113 tempOutput = mkstemp(dir=
'.', prefix=
'tmp.')
114 os.close(tempOutput[0])
115 logging.debug(
'Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
116 self._haddJobArray.
append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
◆ executeAll()
def rhadd.haddStep.executeAll |
( |
|
self, |
|
|
|
parallel = 1 |
|
) |
| |
Definition at line 120 of file rhadd.py.
120 def executeAll(self, parallel = 1):
123 logging.info(
'Starting merge using up to %d hadd processes in parallel' % parallel)
124 logging.warning(
'Parallel merging is experimental')
125 pool = Pool(processes = parallel)
126 parallelResultsArray = []
127 for job
in self._haddJobArray:
128 parallelResultsArray.append(pool.apply_async(job, ()))
136 for i, job
in enumerate(self._haddJobArray):
137 job.exitCode = parallelResultsArray[i].
get(timeout=0)
139 for job
in self._haddJobArray:
140 if job.exitCode != 0:
141 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
145 for job
in self._haddJobArray:
147 if job.exitCode != 0:
148 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
◆ numHadds()
def rhadd.haddStep.numHadds |
( |
|
self | ) |
|
Definition at line 157 of file rhadd.py.
158 return len(self._haddJobArray)
◆ outputFiles()
def rhadd.haddStep.outputFiles |
( |
|
self | ) |
|
Definition at line 153 of file rhadd.py.
153 def outputFiles(self):
154 return [ job.outputFile
for job
in self._haddJobArray ]
◆ _bunchNumber
rhadd.haddStep._bunchNumber |
|
private |
◆ _finalOutputFile
rhadd.haddStep._finalOutputFile |
|
private |
◆ _haddJobArray
rhadd.haddStep._haddJobArray |
|
private |
◆ _inputFiles
rhadd.haddStep._inputFiles |
|
private |
◆ _level
The documentation for this class was generated from the following file: