1 from __future__
import division
2 from builtins
import object
3 from builtins
import range
36 from multiprocessing
import Pool
37 from subprocess
import Popen, STDOUT, PIPE
38 from tempfile
import mkstemp
40 logging.basicConfig(level=logging.INFO)
43 '''A single hadd job'''
63 '''Execute the hadd command'''
64 mergeCmd = [
'hadd',
'-f']
68 logging.info(
'Will now execute merge: %s' %
' '.
join(mergeCmd))
70 job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=
True)
71 while job.poll()
is None:
72 output.append(job.stdout.readline().strip())
75 logging.warning(
'Non zero return code from hadd. STDOUT/ERR follows:\n%s' % os.linesep.join(output))
81 '''Wrapper to call my own exe function and return the exit code of hadd'''
87 '''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
88 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
100 logging.debug(
'Need %d merges for level %d' % (nMerges, self.
_level))
108 for job
in range(nMerges):
112 lastFile =
int(fileCounter + 0.5)
113 tempOutput = mkstemp(dir=
'.', prefix=
'tmp.')
114 os.close(tempOutput[0])
115 logging.debug(
'Intermediate merge job %d: %s -> %s' % (job, self.
_inputFiles[nextFile:lastFile], tempOutput[1]))
123 logging.info(
'Starting merge using up to %d hadd processes in parallel' % parallel)
124 logging.warning(
'Parallel merging is experimental')
125 pool = Pool(processes = parallel)
126 parallelResultsArray = []
128 parallelResultsArray.append(pool.apply_async(job, ()))
137 job.exitCode = parallelResultsArray[i].
get(timeout=0)
140 if job.exitCode != 0:
141 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
147 if job.exitCode != 0:
148 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
165 parser = argparse.ArgumentParser(description=
'Recursive wrapper around the ROOT hadd script.',
166 epilog=
'Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments')
167 parser.add_argument(
'outputFile', help=
'Single merged output file')
168 parser.add_argument(
'inputFiles', nargs=
'+', help=
'Input files to merge')
169 parser.add_argument(
'-n',
'--bunchNumber', type=int, help=
'File batching number for single hadds', default=10)
170 parser.add_argument(
'-p',
'--parallelMerge', type=int,
171 help=
'Number of merges to do in parallel (experimental, please do not use in production)', default=1)
173 args = vars(parser.parse_args(sys.argv[1:]))
178 if args[
'bunchNumber'] <= 1:
179 logging.error(
'bunchNumber parameter must be greater than 1')
182 if args[
'parallelMerge'] < 1:
183 logging.error(
'parallelMerge parameter must be greater than 1')
193 '''Setup the cascade of merge jobs and execute each level in turn'''
197 jobGraph.append(
haddStep(bunchNumber = args[
'bunchNumber'],
198 inputFiles = args[
'inputFiles'],
199 finalOutputFile = args[
'outputFile'], level = level))
201 while jobGraph[-1].numHadds > 1:
203 jobGraph.append(
haddStep(bunchNumber = args[
'bunchNumber'],
204 inputFiles = jobGraph[-1].outputFiles,
205 finalOutputFile = args[
'outputFile'], level = level))
206 logging.debug(jobGraph[-1])
209 for i, jobs
in enumerate(jobGraph):
210 logging.info(
'Executing merge interation step %d' % i)
211 jobs.executeAll(args[
'parallelMerge'])
213 logging.info(
'Final merge completed successfully.')
216 if __name__ ==
'__main__':