ATLAS Offline Software
Loading...
Searching...
No Matches
rhadd.haddStep Class Reference
Inheritance diagram for rhadd.haddStep:
Collaboration diagram for rhadd.haddStep:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 __init__ (self, inputFiles, bunchNumber, finalOutputFile, level=None)
 executeAll (self, parallel=1)
 outputFiles (self)
 numHadds (self)
 __str__ (self)

Protected Member Functions

 _defineMergeJobs (self)

Protected Attributes

 _inputFiles = inputFiles
 _bunchNumber = bunchNumber
 _finalOutputFile = finalOutputFile
list _haddJobArray = []
 _level = level

Detailed Description

An hadd iteration - takes a bunch of input files and produces a bunch of output files

Definition at line 84 of file rhadd.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Constructor & Destructor Documentation

◆ __init__()

rhadd.haddStep.__init__ ( self,
inputFiles,
bunchNumber,
finalOutputFile,
level = None )

Definition at line 86 of file rhadd.py.

86 def __init__(self, inputFiles, bunchNumber, finalOutputFile, level = None):
87 self._inputFiles = inputFiles
88 self._bunchNumber = bunchNumber
89 self._finalOutputFile = finalOutputFile
90 self._haddJobArray = []
91 self._level = level
92 self._defineMergeJobs()
93
94

Member Function Documentation

◆ __str__()

rhadd.haddStep.__str__ ( self)

Definition at line 158 of file rhadd.py.

158 def __str__(self):
159 return 'Merging level %s: %s' % (self._level, str([ str(job) for job in self._haddJobArray ]))
160
161

◆ _defineMergeJobs()

rhadd.haddStep._defineMergeJobs ( self)
protected

Definition at line 95 of file rhadd.py.

95 def _defineMergeJobs(self):
96 # How many merges to do in this step?
97 nMerges = (len(self._inputFiles)-1) // self._bunchNumber + 1
98 logging.debug('Need %d merges for level %d', nMerges, self._level)
99 if nMerges == 1:
100 logging.debug('Final merge job: %s -> %s', self._inputFiles, self._inputFiles)
101 self._haddJobArray.append(haddJob(self._inputFiles, self._finalOutputFile))
102 return
103
104 # With >1 merge need temporary files
105 nextFile = 0
106 for job in range(nMerges):
107 # Try to ensure we have ~equal numbers of files in each merge
108 fileCounter = len(self._inputFiles) * float(job+1) / nMerges
109 # Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
110 lastFile = int(fileCounter + 0.5)
111 tempOutput = mkstemp(dir='.', prefix='tmp.')
112 os.close(tempOutput[0])
113 logging.debug('Intermediate merge job %d: %s -> %s', job, self._inputFiles[nextFile:lastFile], tempOutput[1])
114 self._haddJobArray.append(haddJob(self._inputFiles[nextFile:lastFile], tempOutput[1]))
115 nextFile = lastFile
116
117

◆ executeAll()

rhadd.haddStep.executeAll ( self,
parallel = 1 )

Definition at line 118 of file rhadd.py.

118 def executeAll(self, parallel = 1):
119 if parallel > 1:
120 # Funky parallel processing
121 logging.info('Starting merge using up to %d hadd processes in parallel', parallel)
122 logging.warning('Parallel merging is experimental')
123 pool = Pool(processes = parallel)
124 parallelResultsArray = []
125 for job in self._haddJobArray:
126 parallelResultsArray.append(pool.apply_async(job, ()))
127 pool.close()
128 # The next two lines will stick until all the worker processes are finished
129 # Really one needs a progress loop monitor with a timeout...
130 pool.join()
131
132 # Update our hadd exit codes to the parallel processed return code, because the copy of the
133 # instance held by the worker was the one where the exe method was actually called
134 for i, job in enumerate(self._haddJobArray):
135 job.exitCode = parallelResultsArray[i].get(timeout=0)
136
137 for job in self._haddJobArray:
138 if job.exitCode != 0:
139 logging.error('Merging job %s failed, exit code %s', job, job.exitCode)
140 sys.exit(1)
141 else:
142 # Safe and slow serial processing
143 for job in self._haddJobArray:
144 job.exe()
145 if job.exitCode != 0:
146 logging.error('Merging job %s failed, exit code %s', job, job.exitCode)
147 sys.exit(1)
148
149
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130

◆ numHadds()

rhadd.haddStep.numHadds ( self)

Definition at line 155 of file rhadd.py.

155 def numHadds(self):
156 return len(self._haddJobArray)
157

◆ outputFiles()

rhadd.haddStep.outputFiles ( self)

Definition at line 151 of file rhadd.py.

151 def outputFiles(self):
152 return [ job.outputFile for job in self._haddJobArray ]
153

Member Data Documentation

◆ _bunchNumber

rhadd.haddStep._bunchNumber = bunchNumber
protected

Definition at line 88 of file rhadd.py.

◆ _finalOutputFile

rhadd.haddStep._finalOutputFile = finalOutputFile
protected

Definition at line 89 of file rhadd.py.

◆ _haddJobArray

rhadd.haddStep._haddJobArray = []
protected

Definition at line 90 of file rhadd.py.

◆ _inputFiles

rhadd.haddStep._inputFiles = inputFiles
protected

Definition at line 87 of file rhadd.py.

◆ _level

rhadd.haddStep._level = level
protected

Definition at line 91 of file rhadd.py.


The documentation for this class was generated from the following file: