ATLAS Offline Software
rhadd.py
Go to the documentation of this file.
1 from builtins import object
2 from builtins import range
3 #! /usr/bin/env python
4 
5 # Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
6 #
7 # $Id: rhadd.py 677431 2015-06-23 08:09:12Z graemes $
8 #
9 # Recursive historgam adder, wrapping around hadd
10 # Author: Graeme A Stewart <graeme.andrew.stewart@cern.ch>
11 #
12 # usage: rhadd.py [-h] [-n BUNCHNUMBER] [-p PARALLELMERGE]
13 # outputFile inputFiles [inputFiles ...]
14 #
15 # Recursive wrapper around the ROOT hadd script.
16 #
17 # positional arguments:
18 # outputFile Single merged output file
19 # inputFiles Input files to merge
20 #
21 # optional arguments:
22 # -h, --help show this help message and exit
23 # -n BUNCHNUMBER, --bunchNumber BUNCHNUMBER
24 # File batching number for single hadds
25 # -p PARALLELMERGE, --parallelMerge PARALLELMERGE
26 # Number of merges to do in parallel (experimental,
27 # please do not use in production)
28 
29 
30 import argparse
31 import logging
32 import os
33 import sys
34 
35 from multiprocessing import Pool
36 from subprocess import Popen, STDOUT, PIPE
37 from tempfile import mkstemp
38 
39 logging.basicConfig(level=logging.INFO)
40 
41 class haddJob(object):
42  '''A single hadd job'''
43  def __init__(self, inputFiles, outputFile):
44  self._inputFiles = inputFiles
45  self._outputFile = outputFile
46  self._exitCode = None
47 
48  @property
49  def outputFile(self):
50  return(self._outputFile)
51 
52  @property
53  def exitCode(self):
54  return self._exitCode
55 
56  @exitCode.setter
57  # This is needed to paste the parallel executed exitCode back to the 'parent' instance
58  def exitCode(self, value):
59  self._exitCode = value
60 
61  def exe(self):
62  '''Execute the hadd command'''
63  mergeCmd = ['hadd', '-f'] # -f because our output file was touched to be safely created
64  mergeCmd.append(self._outputFile)
65  mergeCmd.extend(self._inputFiles)
66 
67  logging.info('Will now execute merge: %s' % ' '.join(mergeCmd))
68  output = []
69  job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=True)
70  while job.poll() is None:
71  output.append(job.stdout.readline().strip())
72  self._exitCode = job.returncode
73  if self._exitCode != 0:
74  logging.warning('Non zero return code from hadd. STDOUT/ERR follows:\n%s' % os.linesep.join(output))
75 
76  def __str__(self):
77  return str(self._inputFiles) + ' -> ' + str(self._outputFile)
78 
79  def __call__(self):
80  '''Wrapper to call my own exe function and return the exit code of hadd'''
81  self.exe()
82  return self._exitCode
83 
84 
86  '''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
87  def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
88  self._inputFiles = inputFiles
89  self._bunchNumber = bunchNumber
90  self._finalOutputFile = finalOutputFile
91  self._haddJobArray = []
92  self._level = level
93  self._defineMergeJobs()
94 
95 
96  def _defineMergeJobs(self):
97  # How many merges to do in this step?
98  nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
99  logging.debug('Need %d merges for level %d' % (nMerges, self._level))
100  if nMerges == 1:
101  logging.debug('Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
103  return
104 
105  # With >1 merge need temporary files
106  nextFile = 0
107  for job in range(nMerges):
108  # Try to ensure we have ~equal numbers of files in each merge
109  fileCounter = len(self._inputFiles) * float(job+1) / nMerges
110  # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
111  lastFile = int(fileCounter + 0.5)
112  tempOutput = mkstemp(dir='.', prefix='tmp.')
113  os.close(tempOutput[0])
114  logging.debug('Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
115  self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
116  nextFile = lastFile
117 
118 
119  def executeAll(self, parallel = 1):
120  if parallel > 1:
121  # Funky parallel processing
122  logging.info('Starting merge using up to %d hadd processes in parallel' % parallel)
123  logging.warning('Parallel merging is experimental')
124  pool = Pool(processes = parallel)
125  parallelResultsArray = []
126  for job in self._haddJobArray:
127  parallelResultsArray.append(pool.apply_async(job, ()))
128  pool.close()
129  # The next two lines will stick until all the worker processes are finished
130  # Really one needs a progress loop monitor with a timeout...
131  pool.join()
132 
133  # Update our hadd exit codes to the parallel processed return code, because the copy of the
134  # instance held by the worker was the one where the exe method was actually called
135  for i, job in enumerate(self._haddJobArray):
136  job.exitCode = parallelResultsArray[i].get(timeout=0)
137 
138  for job in self._haddJobArray:
139  if job.exitCode != 0:
140  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
141  sys.exit(1)
142  else:
143  # Safe and slow serial processing
144  for job in self._haddJobArray:
145  job.exe()
146  if job.exitCode != 0:
147  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
148  sys.exit(1)
149 
150 
151  @property
152  def outputFiles(self):
153  return [ job.outputFile for job in self._haddJobArray ]
154 
155  @property
156  def numHadds(self):
157  return len(self._haddJobArray)
158 
159  def __str__(self):
160  return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
161 
162 
163 def main():
164  parser = argparse.ArgumentParser(description='Recursive wrapper around the ROOT hadd script.',
165  epilog='Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments')
166  parser.add_argument('outputFile', help='Single merged output file')
167  parser.add_argument('inputFiles', nargs='+', help='Input files to merge')
168  parser.add_argument('-n', '--bunchNumber', type=int, help='File batching number for single hadds', default=10)
169  parser.add_argument('-p', '--parallelMerge', type=int,
170  help='Number of merges to do in parallel (experimental, please do not use in production)', default=1)
171 
172  args = vars(parser.parse_args(sys.argv[1:]))
173 
174  logging.debug(args)
175 
176  # Sanity checks
177  if args['bunchNumber'] <= 1:
178  logging.error('bunchNumber parameter must be greater than 1')
179  sys.exit(2)
180 
181  if args['parallelMerge'] < 1:
182  logging.error('parallelMerge parameter must be greater than 1')
183  sys.exit(2)
184 
185 
186  doRecursiveMerge(args)
187 
188  sys.exit(0)
189 
190 
192  '''Setup the cascade of merge jobs and execute each level in turn'''
193  # First setup the cascade of merge steps
194  jobGraph = []
195  level = 0
196  jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
197  inputFiles = args['inputFiles'],
198  finalOutputFile = args['outputFile'], level = level))
199 
200  while jobGraph[-1].numHadds > 1:
201  level += 1
202  jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
203  inputFiles = jobGraph[-1].outputFiles,
204  finalOutputFile = args['outputFile'], level = level))
205  logging.debug(jobGraph[-1])
206 
207  # Now execute each merge stage in turn
208  for i, jobs in enumerate(jobGraph):
209  logging.info('Executing merge interation step %d' % i)
210  jobs.executeAll(args['parallelMerge'])
211 
212  logging.info('Final merge completed successfully.')
213 
214 
215 if __name__ == '__main__':
216  main()
rhadd.doRecursiveMerge
def doRecursiveMerge(args)
Definition: rhadd.py:191
rhadd.haddStep
Definition: rhadd.py:85
rhadd.haddJob._inputFiles
_inputFiles
Definition: rhadd.py:44
rhadd.haddJob.exitCode
def exitCode(self)
Definition: rhadd.py:53
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
rhadd.haddJob
Definition: rhadd.py:41
rhadd.haddStep.__str__
def __str__(self)
Definition: rhadd.py:159
rhadd.haddStep.numHadds
def numHadds(self)
Definition: rhadd.py:156
rhadd.haddJob.__call__
def __call__(self)
Definition: rhadd.py:79
rhadd.haddStep.__init__
def __init__(self, inputFiles, bunchNumber, finalOutputFile, level=None)
Definition: rhadd.py:87
rhadd.haddJob.__str__
def __str__(self)
Definition: rhadd.py:76
rhadd.haddJob.__init__
def __init__(self, inputFiles, outputFile)
Definition: rhadd.py:43
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
rhadd.haddStep._finalOutputFile
_finalOutputFile
Definition: rhadd.py:90
rhadd.haddStep._defineMergeJobs
def _defineMergeJobs(self)
Definition: rhadd.py:96
rhadd.haddJob.exe
def exe(self)
Definition: rhadd.py:61
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
rhadd.main
def main()
Definition: rhadd.py:163
rhadd.haddStep._level
_level
Definition: rhadd.py:92
rhadd.haddStep.executeAll
def executeAll(self, parallel=1)
Definition: rhadd.py:119
rhadd.haddStep.outputFiles
def outputFiles(self)
Definition: rhadd.py:152
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
rhadd.haddJob._outputFile
_outputFile
Definition: rhadd.py:45
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
rhadd.haddStep._bunchNumber
_bunchNumber
Definition: rhadd.py:89
pickleTool.object
object
Definition: pickleTool.py:29
str
Definition: BTagTrackIpAccessor.cxx:11
rhadd.haddStep._haddJobArray
_haddJobArray
Definition: rhadd.py:91
rhadd.haddJob.outputFile
def outputFile(self)
Definition: rhadd.py:49
rhadd.haddJob._exitCode
_exitCode
Definition: rhadd.py:46
python.LArMinBiasAlgConfig.float
float
Definition: LArMinBiasAlgConfig.py:65
rhadd.haddStep._inputFiles
_inputFiles
Definition: rhadd.py:88