ATLAS Offline Software
rhadd.py
Go to the documentation of this file.
1 from __future__ import division
2 from builtins import object
3 from builtins import range
4 #! /usr/bin/env python
5 
6 # Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
7 #
8 # $Id: rhadd.py 677431 2015-06-23 08:09:12Z graemes $
9 #
10 # Recursive historgam adder, wrapping around hadd
11 # Author: Graeme A Stewart <graeme.andrew.stewart@cern.ch>
12 #
13 # usage: rhadd.py [-h] [-n BUNCHNUMBER] [-p PARALLELMERGE]
14 # outputFile inputFiles [inputFiles ...]
15 #
16 # Recursive wrapper around the ROOT hadd script.
17 #
18 # positional arguments:
19 # outputFile Single merged output file
20 # inputFiles Input files to merge
21 #
22 # optional arguments:
23 # -h, --help show this help message and exit
24 # -n BUNCHNUMBER, --bunchNumber BUNCHNUMBER
25 # File batching number for single hadds
26 # -p PARALLELMERGE, --parallelMerge PARALLELMERGE
27 # Number of merges to do in parallel (experimental,
28 # please do not use in production)
29 
30 
31 import argparse
32 import logging
33 import os
34 import sys
35 
36 from multiprocessing import Pool
37 from subprocess import Popen, STDOUT, PIPE
38 from tempfile import mkstemp
39 
40 logging.basicConfig(level=logging.INFO)
41 
42 class haddJob(object):
43  '''A single hadd job'''
44  def __init__(self, inputFiles, outputFile):
45  self._inputFiles = inputFiles
46  self._outputFile = outputFile
47  self._exitCode = None
48 
49  @property
50  def outputFile(self):
51  return(self._outputFile)
52 
53  @property
54  def exitCode(self):
55  return self._exitCode
56 
57  @exitCode.setter
58  # This is needed to paste the parallel executed exitCode back to the 'parent' instance
59  def exitCode(self, value):
60  self._exitCode = value
61 
62  def exe(self):
63  '''Execute the hadd command'''
64  mergeCmd = ['hadd', '-f'] # -f because our output file was touched to be safely created
65  mergeCmd.append(self._outputFile)
66  mergeCmd.extend(self._inputFiles)
67 
68  logging.info('Will now execute merge: %s' % ' '.join(mergeCmd))
69  output = []
70  job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=True)
71  while job.poll() is None:
72  output.append(job.stdout.readline().strip())
73  self._exitCode = job.returncode
74  if self._exitCode != 0:
75  logging.warning('Non zero return code from hadd. STDOUT/ERR follows:\n%s' % os.linesep.join(output))
76 
77  def __str__(self):
78  return str(self._inputFiles) + ' -> ' + str(self._outputFile)
79 
80  def __call__(self):
81  '''Wrapper to call my own exe function and return the exit code of hadd'''
82  self.exe()
83  return self._exitCode
84 
85 
87  '''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
88  def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
89  self._inputFiles = inputFiles
90  self._bunchNumber = bunchNumber
91  self._finalOutputFile = finalOutputFile
92  self._haddJobArray = []
93  self._level = level
94  self._defineMergeJobs()
95 
96 
97  def _defineMergeJobs(self):
98  # How many merges to do in this step?
99  nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
100  logging.debug('Need %d merges for level %d' % (nMerges, self._level))
101  if nMerges == 1:
102  logging.debug('Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
104  return
105 
106  # With >1 merge need temporary files
107  nextFile = 0
108  for job in range(nMerges):
109  # Try to ensure we have ~equal numbers of files in each merge
110  fileCounter = len(self._inputFiles) * float(job+1) / nMerges
111  # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
112  lastFile = int(fileCounter + 0.5)
113  tempOutput = mkstemp(dir='.', prefix='tmp.')
114  os.close(tempOutput[0])
115  logging.debug('Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
116  self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
117  nextFile = lastFile
118 
119 
120  def executeAll(self, parallel = 1):
121  if parallel > 1:
122  # Funky parallel processing
123  logging.info('Starting merge using up to %d hadd processes in parallel' % parallel)
124  logging.warning('Parallel merging is experimental')
125  pool = Pool(processes = parallel)
126  parallelResultsArray = []
127  for job in self._haddJobArray:
128  parallelResultsArray.append(pool.apply_async(job, ()))
129  pool.close()
130  # The next two lines will stick until all the worker processes are finished
131  # Really one needs a progress loop monitor with a timeout...
132  pool.join()
133 
134  # Update our hadd exit codes to the parallel processed return code, because the copy of the
135  # instance held by the worker was the one where the exe method was actually called
136  for i, job in enumerate(self._haddJobArray):
137  job.exitCode = parallelResultsArray[i].get(timeout=0)
138 
139  for job in self._haddJobArray:
140  if job.exitCode != 0:
141  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
142  sys.exit(1)
143  else:
144  # Safe and slow serial processing
145  for job in self._haddJobArray:
146  job.exe()
147  if job.exitCode != 0:
148  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
149  sys.exit(1)
150 
151 
152  @property
153  def outputFiles(self):
154  return [ job.outputFile for job in self._haddJobArray ]
155 
156  @property
157  def numHadds(self):
158  return len(self._haddJobArray)
159 
160  def __str__(self):
161  return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
162 
163 
164 def main():
165  parser = argparse.ArgumentParser(description='Recursive wrapper around the ROOT hadd script.',
166  epilog='Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments')
167  parser.add_argument('outputFile', help='Single merged output file')
168  parser.add_argument('inputFiles', nargs='+', help='Input files to merge')
169  parser.add_argument('-n', '--bunchNumber', type=int, help='File batching number for single hadds', default=10)
170  parser.add_argument('-p', '--parallelMerge', type=int,
171  help='Number of merges to do in parallel (experimental, please do not use in production)', default=1)
172 
173  args = vars(parser.parse_args(sys.argv[1:]))
174 
175  logging.debug(args)
176 
177  # Sanity checks
178  if args['bunchNumber'] <= 1:
179  logging.error('bunchNumber parameter must be greater than 1')
180  sys.exit(2)
181 
182  if args['parallelMerge'] < 1:
183  logging.error('parallelMerge parameter must be greater than 1')
184  sys.exit(2)
185 
186 
187  doRecursiveMerge(args)
188 
189  sys.exit(0)
190 
191 
193  '''Setup the cascade of merge jobs and execute each level in turn'''
194  # First setup the cascade of merge steps
195  jobGraph = []
196  level = 0
197  jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
198  inputFiles = args['inputFiles'],
199  finalOutputFile = args['outputFile'], level = level))
200 
201  while jobGraph[-1].numHadds > 1:
202  level += 1
203  jobGraph.append(haddStep(bunchNumber = args['bunchNumber'],
204  inputFiles = jobGraph[-1].outputFiles,
205  finalOutputFile = args['outputFile'], level = level))
206  logging.debug(jobGraph[-1])
207 
208  # Now execute each merge stage in turn
209  for i, jobs in enumerate(jobGraph):
210  logging.info('Executing merge interation step %d' % i)
211  jobs.executeAll(args['parallelMerge'])
212 
213  logging.info('Final merge completed successfully.')
214 
215 
216 if __name__ == '__main__':
217  main()
rhadd.doRecursiveMerge
def doRecursiveMerge(args)
Definition: rhadd.py:192
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
rhadd.haddStep
Definition: rhadd.py:86
rhadd.haddJob._inputFiles
_inputFiles
Definition: rhadd.py:45
rhadd.haddJob.exitCode
def exitCode(self)
Definition: rhadd.py:54
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
rhadd.haddJob
Definition: rhadd.py:42
rhadd.haddStep.__str__
def __str__(self)
Definition: rhadd.py:160
rhadd.haddStep.numHadds
def numHadds(self)
Definition: rhadd.py:157
rhadd.haddJob.__call__
def __call__(self)
Definition: rhadd.py:80
rhadd.haddStep.__init__
def __init__(self, inputFiles, bunchNumber, finalOutputFile, level=None)
Definition: rhadd.py:88
rhadd.haddJob.__str__
def __str__(self)
Definition: rhadd.py:77
rhadd.haddJob.__init__
def __init__(self, inputFiles, outputFile)
Definition: rhadd.py:44
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
rhadd.haddStep._finalOutputFile
_finalOutputFile
Definition: rhadd.py:91
rhadd.haddStep._defineMergeJobs
def _defineMergeJobs(self)
Definition: rhadd.py:97
rhadd.haddJob.exe
def exe(self)
Definition: rhadd.py:62
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
rhadd.main
def main()
Definition: rhadd.py:164
rhadd.haddStep._level
_level
Definition: rhadd.py:93
rhadd.haddStep.executeAll
def executeAll(self, parallel=1)
Definition: rhadd.py:120
rhadd.haddStep.outputFiles
def outputFiles(self)
Definition: rhadd.py:153
rhadd.haddJob._outputFile
_outputFile
Definition: rhadd.py:46
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
rhadd.haddStep._bunchNumber
_bunchNumber
Definition: rhadd.py:90
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
rhadd.haddStep._haddJobArray
_haddJobArray
Definition: rhadd.py:92
rhadd.haddJob.outputFile
def outputFile(self)
Definition: rhadd.py:50
rhadd.haddJob._exitCode
_exitCode
Definition: rhadd.py:47
readCCLHist.float
float
Definition: readCCLHist.py:83
rhadd.haddStep._inputFiles
_inputFiles
Definition: rhadd.py:89