1 from builtins
import object
2 from builtins
import range
35 from multiprocessing
import Pool
36 from subprocess
import Popen, STDOUT, PIPE
37 from tempfile
import mkstemp
39 logging.basicConfig(level=logging.INFO)
42 '''A single hadd job'''
62 '''Execute the hadd command'''
63 mergeCmd = [
'hadd',
'-f']
67 logging.info(
'Will now execute merge: %s' %
' '.
join(mergeCmd))
69 job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=
True)
70 while job.poll()
is None:
71 output.append(job.stdout.readline().strip())
74 logging.warning(
'Non zero return code from hadd. STDOUT/ERR follows:\n%s' % os.linesep.join(output))
80 '''Wrapper to call my own exe function and return the exit code of hadd'''
86 '''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
87 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
99 logging.debug(
'Need %d merges for level %d' % (nMerges, self.
_level))
107 for job
in range(nMerges):
111 lastFile =
int(fileCounter + 0.5)
112 tempOutput = mkstemp(dir=
'.', prefix=
'tmp.')
113 os.close(tempOutput[0])
114 logging.debug(
'Intermediate merge job %d: %s -> %s' % (job, self.
_inputFiles[nextFile:lastFile], tempOutput[1]))
122 logging.info(
'Starting merge using up to %d hadd processes in parallel' % parallel)
123 logging.warning(
'Parallel merging is experimental')
124 pool = Pool(processes = parallel)
125 parallelResultsArray = []
127 parallelResultsArray.append(pool.apply_async(job, ()))
136 job.exitCode = parallelResultsArray[i].
get(timeout=0)
139 if job.exitCode != 0:
140 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
146 if job.exitCode != 0:
147 logging.error(
'Merging job %s failed, exit code %s' % (job, job.exitCode))
164 parser = argparse.ArgumentParser(description=
'Recursive wrapper around the ROOT hadd script.',
165 epilog=
'Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments')
166 parser.add_argument(
'outputFile', help=
'Single merged output file')
167 parser.add_argument(
'inputFiles', nargs=
'+', help=
'Input files to merge')
168 parser.add_argument(
'-n',
'--bunchNumber', type=int, help=
'File batching number for single hadds', default=10)
169 parser.add_argument(
'-p',
'--parallelMerge', type=int,
170 help=
'Number of merges to do in parallel (experimental, please do not use in production)', default=1)
172 args = vars(parser.parse_args(sys.argv[1:]))
177 if args[
'bunchNumber'] <= 1:
178 logging.error(
'bunchNumber parameter must be greater than 1')
181 if args[
'parallelMerge'] < 1:
182 logging.error(
'parallelMerge parameter must be greater than 1')
192 '''Setup the cascade of merge jobs and execute each level in turn'''
196 jobGraph.append(
haddStep(bunchNumber = args[
'bunchNumber'],
197 inputFiles = args[
'inputFiles'],
198 finalOutputFile = args[
'outputFile'], level = level))
200 while jobGraph[-1].numHadds > 1:
202 jobGraph.append(
haddStep(bunchNumber = args[
'bunchNumber'],
203 inputFiles = jobGraph[-1].outputFiles,
204 finalOutputFile = args[
'outputFile'], level = level))
205 logging.debug(jobGraph[-1])
208 for i, jobs
in enumerate(jobGraph):
209 logging.info(
'Executing merge interation step %d' % i)
210 jobs.executeAll(args[
'parallelMerge'])
212 logging.info(
'Final merge completed successfully.')
215 if __name__ ==
'__main__':