ATLAS Offline Software
Loading...
Searching...
No Matches
PhysicsAnalysis
PATJobTransforms
scripts
rhadd.py
Go to the documentation of this file.
1
from
builtins
import
object
2
from
builtins
import
range
3
#! /usr/bin/env python
4
5
# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
6
#
7
#
8
# Recursive historgam adder, wrapping around hadd
9
# Author: Graeme A Stewart <graeme.andrew.stewart@cern.ch>
10
#
11
# usage: rhadd.py [-h] [-n BUNCHNUMBER] [-p PARALLELMERGE]
12
# outputFile inputFiles [inputFiles ...]
13
#
14
# Recursive wrapper around the ROOT hadd script.
15
#
16
# positional arguments:
17
# outputFile Single merged output file
18
# inputFiles Input files to merge
19
#
20
# optional arguments:
21
# -h, --help show this help message and exit
22
# -n BUNCHNUMBER, --bunchNumber BUNCHNUMBER
23
# File batching number for single hadds
24
# -p PARALLELMERGE, --parallelMerge PARALLELMERGE
25
# Number of merges to do in parallel (experimental,
26
# please do not use in production)
27
28
29
import
argparse
30
import
logging
31
import
os
32
import
sys
33
34
from
multiprocessing
import
Pool
35
from
subprocess
import
Popen, STDOUT, PIPE
36
from
tempfile
import
mkstemp
37
38
logging.basicConfig(level=logging.INFO)
39
40
class
haddJob
(
object
):
41
'''A single hadd job'''
42
def
__init__
(self, inputFiles, outputFile):
43
self.
_inputFiles
= inputFiles
44
self.
_outputFile
= outputFile
45
self.
_exitCode
=
None
46
47
@property
48
def
outputFile
(self):
49
return(self.
_outputFile
)
50
51
@property
52
def
exitCode
(self):
53
return
self.
_exitCode
54
55
@exitCode.setter
56
# This is needed to paste the parallel executed exitCode back to the 'parent' instance
57
def
exitCode
(self, value):
58
self.
_exitCode
= value
59
60
def
exe
(self):
61
'''Execute the hadd command'''
62
mergeCmd = [
'hadd'
,
'-f'
]
# -f because our output file was touched to be safely created
63
mergeCmd.append(self.
_outputFile
)
64
mergeCmd.extend(self.
_inputFiles
)
65
66
logging.info(
'Will now execute merge: %s'
,
' '
.join(mergeCmd))
67
output = []
68
job = Popen(mergeCmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=
True
)
69
while
job.poll()
is
None
:
70
output.append(job.stdout.readline().
strip
())
71
self.
_exitCode
= job.returncode
72
if
self.
_exitCode
!= 0:
73
logging.warning(
'Non zero return code from hadd. STDOUT/ERR follows:\n%s'
, os.linesep.join(output))
74
75
def
__str__
(self):
76
return
str(self.
_inputFiles
) +
' -> '
+ str(self.
_outputFile
)
77
78
def
__call__
(self):
79
'''Wrapper to call my own exe function and return the exit code of hadd'''
80
self.
exe
()
81
return
self.
_exitCode
82
83
84
class
haddStep
(
object
):
85
'''An hadd iteration - takes a bunch of input files and produces a bunch of output files'''
86
def
__init__
(self, inputFiles, bunchNumber, finalOutputFile, level = None):
87
self.
_inputFiles
= inputFiles
88
self.
_bunchNumber
= bunchNumber
89
self.
_finalOutputFile
= finalOutputFile
90
self.
_haddJobArray
= []
91
self.
_level
= level
92
self.
_defineMergeJobs
()
93
94
95
def
_defineMergeJobs
(self):
96
# How many merges to do in this step?
97
nMerges = (len(self.
_inputFiles
)-1) // self.
_bunchNumber
+ 1
98
logging.debug(
'Need %d merges for level %d'
, nMerges, self.
_level
)
99
if
nMerges == 1:
100
logging.debug(
'Final merge job: %s -> %s'
, self.
_inputFiles
, self.
_inputFiles
)
101
self.
_haddJobArray
.append(
haddJob
(self.
_inputFiles
, self.
_finalOutputFile
))
102
return
103
104
# With >1 merge need temporary files
105
nextFile = 0
106
for
job
in
range(nMerges):
107
# Try to ensure we have ~equal numbers of files in each merge
108
fileCounter = len(self.
_inputFiles
) * float(job+1) / nMerges
109
# Add 0.5 to ensure that rounding errors don't lose a file off the back... (very unlikely!)
110
lastFile = int(fileCounter + 0.5)
111
tempOutput = mkstemp(dir=
'.'
, prefix=
'tmp.'
)
112
os.close(tempOutput[0])
113
logging.debug(
'Intermediate merge job %d: %s -> %s'
, job, self.
_inputFiles
[nextFile:lastFile], tempOutput[1])
114
self.
_haddJobArray
.append(
haddJob
(self.
_inputFiles
[nextFile:lastFile], tempOutput[1]))
115
nextFile = lastFile
116
117
118
def
executeAll
(self, parallel = 1):
119
if
parallel > 1:
120
# Funky parallel processing
121
logging.info(
'Starting merge using up to %d hadd processes in parallel'
, parallel)
122
logging.warning(
'Parallel merging is experimental'
)
123
pool = Pool(processes = parallel)
124
parallelResultsArray = []
125
for
job
in
self.
_haddJobArray
:
126
parallelResultsArray.append(pool.apply_async(job, ()))
127
pool.close()
128
# The next two lines will stick until all the worker processes are finished
129
# Really one needs a progress loop monitor with a timeout...
130
pool.join()
131
132
# Update our hadd exit codes to the parallel processed return code, because the copy of the
133
# instance held by the worker was the one where the exe method was actually called
134
for
i, job
in
enumerate(self.
_haddJobArray
):
135
job.exitCode = parallelResultsArray[i].
get
(timeout=0)
136
137
for
job
in
self.
_haddJobArray
:
138
if
job.exitCode != 0:
139
logging.error(
'Merging job %s failed, exit code %s'
, job, job.exitCode)
140
sys.exit(1)
141
else
:
142
# Safe and slow serial processing
143
for
job
in
self.
_haddJobArray
:
144
job.exe()
145
if
job.exitCode != 0:
146
logging.error(
'Merging job %s failed, exit code %s'
, job, job.exitCode)
147
sys.exit(1)
148
149
150
@property
151
def
outputFiles
(self):
152
return
[ job.outputFile
for
job
in
self.
_haddJobArray
]
153
154
@property
155
def
numHadds
(self):
156
return
len(self.
_haddJobArray
)
157
158
def
__str__
(self):
159
return
'Merging level %s: %s'
% (self.
_level
, str([ str(job)
for
job
in
self.
_haddJobArray
]))
160
161
162
def
main
():
163
parser = argparse.ArgumentParser(description=
'Recursive wrapper around the ROOT hadd script.'
,
164
epilog=
'Return codes: 0 All OK; 1 Problem with hadd; 2 Invalid arguments'
)
165
parser.add_argument(
'outputFile'
, help=
'Single merged output file'
)
166
parser.add_argument(
'inputFiles'
, nargs=
'+'
, help=
'Input files to merge'
)
167
parser.add_argument(
'-n'
,
'--bunchNumber'
, type=int, help=
'File batching number for single hadds'
, default=10)
168
parser.add_argument(
'-p'
,
'--parallelMerge'
, type=int,
169
help=
'Number of merges to do in parallel (experimental, please do not use in production)'
, default=1)
170
171
args = vars(parser.parse_args(sys.argv[1:]))
172
173
logging.debug(args)
174
175
# Sanity checks
176
if
args[
'bunchNumber'
] <= 1:
177
logging.error(
'bunchNumber parameter must be greater than 1'
)
178
sys.exit(2)
179
180
if
args[
'parallelMerge'
] < 1:
181
logging.error(
'parallelMerge parameter must be greater than 1'
)
182
sys.exit(2)
183
184
185
doRecursiveMerge
(args)
186
187
sys.exit(0)
188
189
190
def
doRecursiveMerge
(args):
191
'''Setup the cascade of merge jobs and execute each level in turn'''
192
# First setup the cascade of merge steps
193
jobGraph = []
194
level = 0
195
jobGraph.append(
haddStep
(bunchNumber = args[
'bunchNumber'
],
196
inputFiles = args[
'inputFiles'
],
197
finalOutputFile = args[
'outputFile'
], level = level))
198
199
while
jobGraph[-1].numHadds > 1:
200
level += 1
201
jobGraph.append(
haddStep
(bunchNumber = args[
'bunchNumber'
],
202
inputFiles = jobGraph[-1].outputFiles,
203
finalOutputFile = args[
'outputFile'
], level = level))
204
logging.debug(jobGraph[-1])
205
206
# Now execute each merge stage in turn
207
for
i, jobs
in
enumerate(jobGraph):
208
logging.info(
'Executing merge interation step %d'
, i)
209
jobs.executeAll(args[
'parallelMerge'
])
210
211
logging.info(
'Final merge completed successfully.'
)
212
213
214
if
__name__ ==
'__main__'
:
215
main
()
SiliconTech::strip
@ strip
Definition
FPGATrackSimTypes.h:25
rhadd.haddJob
Definition
rhadd.py:40
rhadd.haddJob._outputFile
_outputFile
Definition
rhadd.py:44
rhadd.haddJob.__str__
__str__(self)
Definition
rhadd.py:75
rhadd.haddJob.__init__
__init__(self, inputFiles, outputFile)
Definition
rhadd.py:42
rhadd.haddJob._exitCode
_exitCode
Definition
rhadd.py:45
rhadd.haddJob._inputFiles
_inputFiles
Definition
rhadd.py:43
rhadd.haddJob.exe
exe(self)
Definition
rhadd.py:60
rhadd.haddJob.exitCode
exitCode(self)
Definition
rhadd.py:52
rhadd.haddJob.__call__
__call__(self)
Definition
rhadd.py:78
rhadd.haddJob.outputFile
outputFile(self)
Definition
rhadd.py:48
rhadd.haddStep
Definition
rhadd.py:84
rhadd.haddStep._haddJobArray
list _haddJobArray
Definition
rhadd.py:90
rhadd.haddStep.__init__
__init__(self, inputFiles, bunchNumber, finalOutputFile, level=None)
Definition
rhadd.py:86
rhadd.haddStep._defineMergeJobs
_defineMergeJobs(self)
Definition
rhadd.py:95
rhadd.haddStep.numHadds
numHadds(self)
Definition
rhadd.py:155
rhadd.haddStep._bunchNumber
_bunchNumber
Definition
rhadd.py:88
rhadd.haddStep._finalOutputFile
_finalOutputFile
Definition
rhadd.py:89
rhadd.haddStep._inputFiles
_inputFiles
Definition
rhadd.py:87
rhadd.haddStep.outputFiles
outputFiles(self)
Definition
rhadd.py:151
rhadd.haddStep._level
_level
Definition
rhadd.py:91
rhadd.haddStep.executeAll
executeAll(self, parallel=1)
Definition
rhadd.py:118
rhadd.haddStep.__str__
__str__(self)
Definition
rhadd.py:158
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition
hcg.cxx:130
rhadd.main
main()
Definition
rhadd.py:162
rhadd.doRecursiveMerge
doRecursiveMerge(args)
Definition
rhadd.py:190
object
Generated on
for ATLAS Offline Software by
1.14.0