ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
rhadd.haddStep Class Reference
Inheritance diagram for rhadd.haddStep:
Collaboration diagram for rhadd.haddStep:

Public Member Functions

def __init__ (self, inputFiles, bunchNumber, finalOutputFile, level=None)
 
def executeAll (self, parallel=1)
 
def outputFiles (self)
 
def numHadds (self)
 
def __str__ (self)
 

Private Member Functions

def _defineMergeJobs (self)
 

Private Attributes

 _inputFiles
 
 _bunchNumber
 
 _finalOutputFile
 
 _haddJobArray
 
 _level
 

Detailed Description

An hadd iteration - takes a bunch of input files and produces a bunch of output files

Definition at line 85 of file rhadd.py.

Constructor & Destructor Documentation

◆ __init__()

def rhadd.haddStep.__init__ (   self,
  inputFiles,
  bunchNumber,
  finalOutputFile,
  level = None 
)

Definition at line 87 of file rhadd.py.

87  def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
88  self._inputFiles = inputFiles
89  self._bunchNumber = bunchNumber
90  self._finalOutputFile = finalOutputFile
91  self._haddJobArray = []
92  self._level = level
93  self._defineMergeJobs()
94 
95 

Member Function Documentation

◆ __str__()

def rhadd.haddStep.__str__ (   self)

Definition at line 159 of file rhadd.py.

159  def __str__(self):
160  return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
161 
162 

◆ _defineMergeJobs()

def rhadd.haddStep._defineMergeJobs (   self)
private

Definition at line 96 of file rhadd.py.

96  def _defineMergeJobs(self):
97  # How many merges to do in this step?
98  nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
99  logging.debug('Need %d merges for level %d' % (nMerges, self._level))
100  if nMerges == 1:
101  logging.debug('Final merge job: %s -> %s' % (self._inputFiles, self._inputFiles))
102  self._haddJobArray.append(haddJob(self._inputFiles, self._finalOutputFile))
103  return
104 
105  # With >1 merge need temporary files
106  nextFile = 0
107  for job in range(nMerges):
108  # Try to ensure we have ~equal numbers of files in each merge
109  fileCounter = len(self._inputFiles) * float(job+1) / nMerges
110  # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
111  lastFile = int(fileCounter + 0.5)
112  tempOutput = mkstemp(dir='.', prefix='tmp.')
113  os.close(tempOutput[0])
114  logging.debug('Intermediate merge job %d: %s -> %s' % (job, self._inputFiles[nextFile:lastFile], tempOutput[1]))
115  self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
116  nextFile = lastFile
117 
118 

◆ executeAll()

def rhadd.haddStep.executeAll (   self,
  parallel = 1 
)

Definition at line 119 of file rhadd.py.

119  def executeAll(self, parallel = 1):
120  if parallel > 1:
121  # Funky parallel processing
122  logging.info('Starting merge using up to %d hadd processes in parallel' % parallel)
123  logging.warning('Parallel merging is experimental')
124  pool = Pool(processes = parallel)
125  parallelResultsArray = []
126  for job in self._haddJobArray:
127  parallelResultsArray.append(pool.apply_async(job, ()))
128  pool.close()
129  # The next two lines will stick until all the worker processes are finished
130  # Really one needs a progress loop monitor with a timeout...
131  pool.join()
132 
133  # Update our hadd exit codes to the parallel processed return code, because the copy of the
134  # instance held by the worker was the one where the exe method was actually called
135  for i, job in enumerate(self._haddJobArray):
136  job.exitCode = parallelResultsArray[i].get(timeout=0)
137 
138  for job in self._haddJobArray:
139  if job.exitCode != 0:
140  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
141  sys.exit(1)
142  else:
143  # Safe and slow serial processing
144  for job in self._haddJobArray:
145  job.exe()
146  if job.exitCode != 0:
147  logging.error('Merging job %s failed, exit code %s' % (job, job.exitCode))
148  sys.exit(1)
149 
150 

◆ numHadds()

def rhadd.haddStep.numHadds (   self)

Definition at line 156 of file rhadd.py.

156  def numHadds(self):
157  return len(self._haddJobArray)
158 

◆ outputFiles()

def rhadd.haddStep.outputFiles (   self)

Definition at line 152 of file rhadd.py.

152  def outputFiles(self):
153  return [ job.outputFile for job in self._haddJobArray ]
154 

Member Data Documentation

◆ _bunchNumber

rhadd.haddStep._bunchNumber
private

Definition at line 89 of file rhadd.py.

◆ _finalOutputFile

rhadd.haddStep._finalOutputFile
private

Definition at line 90 of file rhadd.py.

◆ _haddJobArray

rhadd.haddStep._haddJobArray
private

Definition at line 91 of file rhadd.py.

◆ _inputFiles

rhadd.haddStep._inputFiles
private

Definition at line 88 of file rhadd.py.

◆ _level

rhadd.haddStep._level
private

Definition at line 92 of file rhadd.py.


The documentation for this class was generated from the following file:
python.processes.powheg.ZZj_MiNNLO.ZZj_MiNNLO.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZj_MiNNLO.py:18
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
str
Definition: BTagTrackIpAccessor.cxx:11
python.LArMinBiasAlgConfig.float
float
Definition: LArMinBiasAlgConfig.py:65