ATLAS Offline Software
Loading...
Searching...
No Matches
rhadd.py
Go to the documentation of this file.
1from builtins import object
2from builtins import range
3#! /usr/bin/env python
4
5# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
6#
7#
8# Recursive historgam adder, wrapping around hadd
9# Author: Graeme A Stewart <graeme.andrew.stewart@cern.ch>
10#
11# usage: rhadd.py [-h] [-n BUNCHNUMBER] [-p PARALLELMERGE]
12# outputFile inputFiles [inputFiles ...]
13#
14# Recursive wrapper around the ROOT hadd script.
15#
16# positional arguments:
17# outputFile Single merged output file
18# inputFiles Input files to merge
19#
20# optional arguments:
21# -h, --help show this help message and exit
22# -n BUNCHNUMBER, --bunchNumber BUNCHNUMBER
23# File batching number for single hadds
24# -p PARALLELMERGE, --parallelMerge PARALLELMERGE
25# Number of merges to do in parallel (experimental,
26# please do not use in production)
27
28
29import argparse
30import logging
31import os
32import sys
33
34from multiprocessing import Pool
35from subprocess import Popen, STDOUT, PIPE
36from tempfile import mkstemp
37
38logging.basicConfig(level=logging.INFO)
39
41 '''A single hadd job'''
42 def __init__(self, inputFiles, outputFile):
43 self._inputFiles = inputFiles
44 self._outputFile = outputFile
45 self._exitCode = None
46
47 @property
48 def outputFile(self):
49 return(self._outputFile)
50
51 @property
52 def exitCode(self):
53 return self._exitCode
54
55 @exitCode.setter
56 # This is needed to paste the parallel executed exitCode back to the 'parent' instance
57 def exitCode(self, value):
58 self._exitCode = value
59
60 def exe(self):
61 '''Execute the hadd command'''
62 mergeCmd = ['hadd', '-f'] # -f because our output file was touched to be safely created
63 mergeCmd.append(self._outputFile)
64 mergeCmd.extend(self._inputFiles)
65
66 logging.info('Will now execute merge: %s', ' '.join(mergeCmd))
67 output = []
68 job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=True)
69 while job.poll() is None:
70 output.append(job.stdout.readline().strip())
71 self._exitCode = job.returncode
72 if self._exitCode != 0:
73 logging.warning('Non zero return code from hadd. STDOUT/ERR follows:\n%s', os.linesep.join(output))
74
75 def __str__(self):
76 return str(self._inputFiles) + ' -> ' + str(self._outputFile)
77
78 def __call__(self):
79 '''Wrapper to call my own exe function and return the exit code of hadd'''
80 self.exe()
81 return self._exitCode
82
83
85 '''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
86 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
87 self._inputFiles = inputFiles
88 self._bunchNumber = bunchNumber
89 self._finalOutputFile = finalOutputFile
90 self._haddJobArray = []
91 self._level = level
92 self._defineMergeJobs()
93
94
96 # How many merges to do in this step?
97 nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
98 logging.debug('Need %d merges for level %d', nMerges, self._level)
99 if nMerges == 1:
100 logging.debug('Final merge job: %s -> %s', self._inputFiles, self._inputFiles)
101 self._haddJobArray.append(haddJob(self._inputFiles, self._finalOutputFile))
102 return
103
104 # With >1 merge need temporary files
105 nextFile = 0
106 for job in range(nMerges):
107 # Try to ensure we have ~equal numbers of files in each merge
108 fileCounter = len(self._inputFiles) * float(job+1) / nMerges
109 # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
110 lastFile = int(fileCounter + 0.5)
111 tempOutput = mkstemp(dir='.', prefix='tmp.')
112 os.close(tempOutput[0])
113 logging.debug('Intermediate merge job %d: %s -> %s', job, self._inputFiles[nextFile:lastFile], tempOutput[1])
114 self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
115 nextFile = lastFile
116
117
118 def executeAll(self, parallel = 1):
119 if parallel > 1:
120 # Funky parallel processing
121 logging.info('Starting merge using up to %d hadd processes in parallel', parallel)
122 logging.warning('Parallel merging is experimental')
123 pool = Pool(processes = parallel)
124 parallelResultsArray = []
125 for job in self._haddJobArray:
126 parallelResultsArray.append(pool.apply_async(job, ()))
127 pool.close()
128 # The next two lines will stick until all the worker processes are finished
129 # Really one needs a progress loop monitor with a timeout...
130 pool.join()
131
132 # Update our hadd exit codes to the parallel processed return code, because the copy of the
133 # instance held by the worker was the one where the exe method was actually called
134 for i, job in enumerate(self._haddJobArray):
135 job.exitCode = parallelResultsArray[i].get(timeout=0)
136
137 for job in self._haddJobArray:
138 if job.exitCode != 0:
139 logging.error('Merging job %s failed, exit code %s', job, job.exitCode)
140 sys.exit(1)
141 else:
142 # Safe and slow serial processing
143 for job in self._haddJobArray:
144 job.exe()
145 if job.exitCode != 0:
146 logging.error('Merging job %s failed, exit code %s', job, job.exitCode)
147 sys.exit(1)
148
149
150 @property
151 def outputFiles(self):
152 return [ job.outputFile for job in self._haddJobArray ]
153
154 @property
155 def numHadds(self):
156 return len(self._haddJobArray)
157
158 def __str__(self):
159 return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
160
161
162def main():
163 parser = argparse.ArgumentParser(description='Recursive wrapper around the ROOT hadd script.',
164 epilog='Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments')
165 parser.add_argument('outputFile', help='Single merged output file')
166 parser.add_argument('inputFiles', nargs='+', help='Input files to merge')
167 parser.add_argument('-n', '--bunchNumber', type=int, help='File batching number for single hadds', default=10)
168 parser.add_argument('-p', '--parallelMerge', type=int,
169 help='Number of merges to do in parallel (experimental, please do not use in production)', default=1)
170
171 args = vars(parser.parse_args(sys.argv[1:]))
172
173 logging.debug(args)
174
175 # Sanity checks
176 if args['bunchNumber'] <= 1:
177 logging.error('bunchNumber parameter must be greater than 1')
178 sys.exit(2)
179
180 if args['parallelMerge'] < 1:
181 logging.error('parallelMerge parameter must be greater than 1')
182 sys.exit(2)
183
184
185 doRecursiveMerge(args)
186
187 sys.exit(0)
188
189
191 '''Setup the cascade of merge jobs and execute each level in turn'''
192 # First setup the cascade of merge steps
193 jobGraph = []
194 level = 0
195 jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
196 inputFiles = args['inputFiles'],
197 finalOutputFile = args['outputFile'], level = level))
198
199 while jobGraph[-1].numHadds > 1:
200 level += 1
201 jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
202 inputFiles = jobGraph[-1].outputFiles,
203 finalOutputFile = args['outputFile'], level = level))
204 logging.debug(jobGraph[-1])
205
206 # Now execute each merge stage in turn
207 for i, jobs in enumerate(jobGraph):
208 logging.info('Executing merge interation step %d', i)
209 jobs.executeAll(args['parallelMerge'])
210
211 logging.info('Final merge completed successfully.')
212
213
214if __name__ == '__main__':
215 main()
__str__(self)
Definition rhadd.py:75
__init__(self, inputFiles, outputFile)
Definition rhadd.py:42
exe(self)
Definition rhadd.py:60
exitCode(self)
Definition rhadd.py:52
__call__(self)
Definition rhadd.py:78
outputFile(self)
Definition rhadd.py:48
list _haddJobArray
Definition rhadd.py:90
__init__(self, inputFiles, bunchNumber, finalOutputFile, level=None)
Definition rhadd.py:86
_defineMergeJobs(self)
Definition rhadd.py:95
numHadds(self)
Definition rhadd.py:155
outputFiles(self)
Definition rhadd.py:151
executeAll(self, parallel=1)
Definition rhadd.py:118
__str__(self)
Definition rhadd.py:158
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
main()
Definition rhadd.py:162
doRecursiveMerge(args)
Definition rhadd.py:190