ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
rhadd.haddStep Class Reference
Inheritance diagram for rhadd.haddStep:
Collaboration diagram for rhadd.haddStep:

Public Member Functions

def __init__ (self, inputFiles, bunchNumber, finalOutputFile, level=None)
 
def executeAll (self, parallel=1)
 
def outputFiles (self)
 
def numHadds (self)
 
def __str__ (self)
 

Private Member Functions

def _defineMergeJobs (self)
 

Private Attributes

 _inputFiles
 
 _bunchNumber
 
 _finalOutputFile
 
 _haddJobArray
 
 _level
 

Detailed Description

An hadd iteration - takes a bunch of input files and produces a bunch of output files

Definition at line 86 of file rhadd.py.

Constructor & Destructor Documentation

◆ __init__()

def rhadd.haddStep.__init__ (   self,
  inputFiles,
  bunchNumber,
  finalOutputFile,
  level = None 
)

Definition at line 88 of file rhadd.py.

88  def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
89  self._inputFiles = inputFiles
90  self._bunchNumber = bunchNumber
91  self._finalOutputFile = finalOutputFile
92  self._haddJobArray = []
93  self._level = level
94  self._defineMergeJobs()
95 
96 

Member Function Documentation

◆ __str__()

def rhadd.haddStep.__str__ (   self)

Definition at line 160 of file rhadd.py.

160  def __str__(self):
161  return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
162 
163 

◆ _defineMergeJobs()

def rhadd.haddStep._defineMergeJobs (   self)
private

Definition at line 97 of file rhadd.py.

97  def _defineMergeJobs(self):
98  # How many merges to do in this step?
99  nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
100  logging.debug('Need %d merges for level %d' % (nMerges, self._level))
101  if nMerges == 1:
102  logging.debug('Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
103  self._haddJobArray.append(haddJob(self._inputFiles, self._finalOutputFile))
104  return
105 
106  # With >1 merge need temporary files
107  nextFile = 0
108  for job in range(nMerges):
109  # Try to ensure we have ~equal numbers of files in each merge
110  fileCounter = len(self._inputFiles) * float(job+1) / nMerges
111  # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
112  lastFile = int(fileCounter + 0.5)
113  tempOutput = mkstemp(dir='.', prefix='tmp.')
114  os.close(tempOutput[0])
115  logging.debug('Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
116  self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
117  nextFile = lastFile
118 
119 

◆ executeAll()

def rhadd.haddStep.executeAll (   self,
  parallel = 1 
)

Definition at line 120 of file rhadd.py.

120  def executeAll(self, parallel = 1):
121  if parallel > 1:
122  # Funky parallel processing
123  logging.info('Starting merge using up to %d hadd processes in parallel' % parallel)
124  logging.warning('Parallel merging is experimental')
125  pool = Pool(processes = parallel)
126  parallelResultsArray = []
127  for job in self._haddJobArray:
128  parallelResultsArray.append(pool.apply_async(job, ()))
129  pool.close()
130  # The next two lines will stick until all the worker processes are finished
131  # Really one needs a progress loop monitor with a timeout...
132  pool.join()
133 
134  # Update our hadd exit codes to the parallel processed return code, because the copy of the
135  # instance held by the worker was the one where the exe method was actually called
136  for i, job in enumerate(self._haddJobArray):
137  job.exitCode = parallelResultsArray[i].get(timeout=0)
138 
139  for job in self._haddJobArray:
140  if job.exitCode != 0:
141  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
142  sys.exit(1)
143  else:
144  # Safe and slow serial processing
145  for job in self._haddJobArray:
146  job.exe()
147  if job.exitCode != 0:
148  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
149  sys.exit(1)
150 
151 

◆ numHadds()

def rhadd.haddStep.numHadds (   self)

Definition at line 157 of file rhadd.py.

157  def numHadds(self):
158  return len(self._haddJobArray)
159 

◆ outputFiles()

def rhadd.haddStep.outputFiles (   self)

Definition at line 153 of file rhadd.py.

153  def outputFiles(self):
154  return [ job.outputFile for job in self._haddJobArray ]
155 

Member Data Documentation

◆ _bunchNumber

rhadd.haddStep._bunchNumber
private

Definition at line 90 of file rhadd.py.

◆ _finalOutputFile

rhadd.haddStep._finalOutputFile
private

Definition at line 91 of file rhadd.py.

◆ _haddJobArray

rhadd.haddStep._haddJobArray
private

Definition at line 92 of file rhadd.py.

◆ _inputFiles

rhadd.haddStep._inputFiles
private

Definition at line 89 of file rhadd.py.

◆ _level

rhadd.haddStep._level
private

Definition at line 93 of file rhadd.py.


The documentation for this class was generated from the following file:
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
str
Definition: BTagTrackIpAccessor.cxx:11
readCCLHist.float
float
Definition: readCCLHist.py:83