ATLAS Offline Software
IoTestsLib.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
5 
6 from __future__ import print_function
7 
8 __author__ = "Sebastien Binet <binet@cern.ch>"
9 __version__ = "$Revision: 1.1 $"
10 __doc__ = """
11 a set of simple minded functions to test ROOT I/O (from python)
12 """
13 
14 from array import array as carray
15 from builtins import range
16 
17 import random
18 # set some dummy seed, for reproducibility
19 random.seed(20080910) # first LHC startup :)
20 
21 from os import sysconf
22 _pagesz = sysconf('SC_PAGE_SIZE') // 1024 # in kb
23 
24 _py_dtype_to_root = {
25  'i' : 'I',
26  'f' : 'F',
27  }
28 """translates the usual python 'dtype' codes to the ROOT/CINT ones
29 """
30 
31 from PyUtils.Decorators import forking
32 
33 def pymon():
34  """returns (cpu[ms], vmem[kb], rss[kb])
35  """
36  from resource import getrusage, RUSAGE_SELF
37  from string import split as ssplit
38  cpu = getrusage(RUSAGE_SELF)
39  mem = open('/proc/self/statm','r')
40  cpu = (cpu.ru_utime+cpu.ru_stime) * 1e3 # in milliseconds
41  mem = ssplit(mem.readlines()[0])
42  vmem = int(mem[0])*_pagesz
43  rss = int(mem[1])*_pagesz
44  return cpu,vmem,rss
45 
46 def comp_delta(d, verbose=False):
47  assert 'start' in d
48  assert 'stop' in d
49  assert len(d['start']) == 3
50  assert len(d['stop']) == 3
51  if verbose:
52  print (repr(d))
53  delta = { 'cpu' : d['stop'][0] - d['start'][0],
54  'vmem': d['stop'][1] - d['start'][1],
55  'rss' : d['stop'][2] - d['start'][2],
56  'nbytes': -1
57  }
58  if 'nbytes' in d:
59  delta['nbytes'] = d['nbytes']
60  print ("==> cpu: %(cpu)8.3f ms vmem: %(vmem)i kB rss: %(rss)i kB nbytes: %(nbytes)i kB" % delta)
61  return delta
62 
64  import sys
65  # for ROOT...
66  if '-b' not in sys.argv:
67  sys.argv.insert(1, '-b')
68  import ROOT
69  return ROOT
70 
71 ROOT = import_ROOT()
72 
73 @forking
74 def io_test1_write(fname, nevts=1000, sz=1000, dtype='i'):
75  """testing writing 1000 evts with arrays of 1000- integers
76  """
77  f = ROOT.TFile.Open(fname, 'RECREATE')
78  t = ROOT.TTree('t', 't')
79 
80  nevts= nevts
81  imax = sz
82  data = carray(dtype, imax*[ 0 ] )
83  #t.Branch( 'mynum', n, 'mynum/I' )
84  t.Branch( 'i', data, 'data[%d]/%s'%(imax, _py_dtype_to_root[dtype]) )
85 
86  from random import randint
87 
88  fill = t.Fill
89  for i in range(nevts):
90  for j in range(sz):
91  data[j] = randint(0, sz)
92  fill()
93 
94  f.Write()
95  f.Close()
96  return
97 
98 @forking
99 def io_test1_read(fname, verbose=False):
100  f = ROOT.TFile.Open(fname, 'READ')
101  t = f.Get('t')
102  assert t, "could not find tree 't'"
103  nevts = t.GetEntries()
104  if verbose:
105  print ("::: reading [%s] (%i events) [sz=%s kB]" % (fname, nevts,
106  f.GetSize()//1024))
107  tot_bytes = 0
108  get_entry = t.GetEntry
109  start = pymon()
110  for ievt in range(nevts):
111  # copy next entry into memory and verify
112  nb = get_entry(ievt)
113  if nb <= 0:
114  continue
115  tot_bytes += nb
116  # use the values directly from the tree
117  assert len(t.data) > 0
118  stop = pymon()
119 
120  del t
121  f.Close()
122 
123  return {'start' : start,
124  'stop' : stop,
125  'nbytes': tot_bytes//1024}
126 
127 @forking
128 def io_test2_write(fname, nevts=1000, sz=1000, dtype='i'):
129  """testing writing 1000 evts with arrays of (variable length) 1000- ints
130  """
131  f = ROOT.TFile.Open(fname, 'RECREATE')
132  t = ROOT.TTree('t', 't')
133 
134  nevts= nevts
135  imax = sz
136 
137  n = carray( 'i', [ 0 ] )
138  data = carray( dtype, imax*[ 0 ] )
139  t.Branch( 'sz', n, 'sz/I' )
140  t.Branch( 'data', data, 'data[sz]/%s'%_py_dtype_to_root[dtype])
141 
142  from random import randint
143 
144  fill = t.Fill
145  for i in range(nevts):
146  jmax = randint(1, sz)
147  n[0] = jmax
148  for j in range(jmax):
149  data[j] = randint(0, sz)
150  fill()
151 
152  f.Write()
153  f.Close()
154  return
155 
156 @forking
157 def io_test2_read(fname, verbose=False):
158  f = ROOT.TFile.Open(fname, 'READ')
159  t = f.Get('t')
160  assert t, "could not find tree 't'"
161  nevts = t.GetEntries()
162  if verbose:
163  print ("::: reading [%s] (%i events) [sz=%s kB]" % (fname, nevts,
164  f.GetSize()//1024))
165  tot_bytes = 0
166  get_entry = t.GetEntry
167  start = pymon()
168  for ievt in range(nevts):
169  # copy next entry into memory and verify
170  nb = get_entry(ievt)
171  if nb <= 0:
172  continue
173  tot_bytes += nb
174  # use the values directly from the tree
175  assert len(t.data) > 0
176  stop = pymon()
177 
178  del t
179  f.Close()
180 
181  return {'start' : start,
182  'stop' : stop,
183  'nbytes': tot_bytes//1024}
184 
185 
186 
187 if __name__ == "__main__":
188  # FIXME: use 'nose' instead... for automatical test discovery
189  print ("::: running all tests...")
190 
191  nreads = 10 # nbr of times to repeat each 'read' test
192  mon_data = {}
193 
194  # -----
195  # io_test1
196  # -----
197 
198  # io_test1 - ints
199  fname = '/tmp/out_test1_ints.root'
200  w = io_test1_write(fname=fname,
201  nevts=100000, sz=1000,
202  dtype='i')
203  mon_data['io_test1-ints'] = []
204  for _ in range(nreads):
205  mon_data['io_test1-ints'].append(comp_delta(io_test1_read(fname=fname)))
206 
207  # io_test1 - floats
208  fname = '/tmp/out_test1_flts.root'
209  w = io_test1_write(fname=fname,
210  nevts=100000, sz=1000,
211  dtype='f')
212  mon_data['io_test1-flts'] = []
213  for _ in range(nreads):
214  mon_data['io_test1-flts'].append(comp_delta(io_test1_read(fname=fname)))
215 
216  # -----
217  # io_test2
218  # -----
219 
220  # io_test2 - ints
221  fname = '/tmp/out_test2_ints.root'
222  w = io_test2_write(fname=fname,
223  nevts=100000, sz=1000,
224  dtype='i')
225  mon_data['io_test2-ints'] = []
226  for _ in range(nreads):
227  mon_data['io_test2-ints'].append(comp_delta(io_test2_read(fname=fname)))
228 
229  # io_test2 - floats
230  fname = '/tmp/out_test2_floats.root'
231  w = io_test2_write(fname=fname,
232  nevts=100000, sz=1000,
233  dtype='f')
234  mon_data['io_test2-flts'] = []
235  for _ in range(nreads):
236  mon_data['io_test2-flts'].append(comp_delta(io_test2_read(fname=fname)))
237 
238 
239  print (mon_data)
python.IoTestsLib.pymon
def pymon()
Definition: IoTestsLib.py:33
python.IoTestsLib.io_test2_write
def io_test2_write(fname, nevts=1000, sz=1000, dtype='i')
Definition: IoTestsLib.py:128
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.IoTestsLib.io_test1_read
def io_test1_read(fname, verbose=False)
Definition: IoTestsLib.py:99
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.IoTestsLib.import_ROOT
def import_ROOT()
Definition: IoTestsLib.py:63
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
PyAthena::repr
std::string repr(PyObject *o)
returns the string representation of a python object equivalent of calling repr(o) in python
Definition: PyAthenaUtils.cxx:106
fill
void fill(H5::Group &out_file, size_t iterations)
Definition: test-hdf5-writer.cxx:95
python.IoTestsLib.comp_delta
def comp_delta(d, verbose=False)
Definition: IoTestsLib.py:46
Trk::open
@ open
Definition: BinningType.h:40
python.IoTestsLib.io_test1_write
def io_test1_write(fname, nevts=1000, sz=1000, dtype='i')
Definition: IoTestsLib.py:74
python.IoTestsLib.io_test2_read
def io_test2_read(fname, verbose=False)
Definition: IoTestsLib.py:157