ATLAS Offline Software
MpUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
2 
3 # @file: PyUtils/MpUtils.py
4 # @purpose: a set of little tools for multiprocessing
5 # stolen from ClePy
6 # http://pypi.python.org/pypi/clepy/0.1
7 # MIT
8 
9 def chunkify(s, chunksize):
10 
11  """
12  Yield sequence s in chunks of size chunksize.
13 
14  >>> list(chunkify('abcdefg', 2))
15  ['ab', 'cd', 'ef', 'g']
16 
17  >>> list(chunkify('abcdefg', 99))
18  ['abcdefg']
19 
20  """
21 
22  for i in range(0, len(s), chunksize):
23  yield s[i:i+chunksize]
24 
25 from multiprocessing import Pipe, Process
26 
28  """Instances of this class process iterators in separate processes."""
29  def __init__(self, itertask, eoi='__eoi__'):
30  """Create a new subprocess iterator.
31 
32  itertask : some iterable task to execute in a subprocess
33  eoi : an end-of-iteration marker - returned from the subprocess
34  to signal that iteration is complete.
35  """
36  self.client, self.master = Pipe()
37  self.end_of_input = eoi
38  pargs = [itertask, self.master, eoi]
39  self.process = Process(target=self.work, args=pargs)
40  self.started = False
41 
42  def _start(self):
43  self.started = True
44  self.process.start()
45 
46  @staticmethod
47  def work(iterator, master, eoi):
48  """The actual callable that is executed in the subprocess."""
49  for chunk in iterator:
50  master.send(chunk)
51  master.send(eoi)
52 
53  def __iter__(self):
54  if not self.started:
55  self._start()
56  return self
57 
58  def next(self):
59  item = self.client.recv()
60  if item != self.end_of_input:
61  return item
62  else:
63  self.next = self._empty
64  raise StopIteration
65 
66  def _empty(self, *args, **params):
67  raise StopIteration
68 
69 def piter(iterable, eoi=None):
70  """Create a new subprocess iterator.
71 
72  iterable : some iterable task to execute in a subprocess
73  eoi : an end-of-iteration marker - returned from the subprocess
74  to signal that iteration is complete.
75  """
76  return SubProcessIterator(iterable, eoi=eoi)
77 
python.MpUtils.chunkify
def chunkify(s, chunksize)
Definition: MpUtils.py:9
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
python.MpUtils.SubProcessIterator._empty
def _empty(self, *args, **params)
Definition: MpUtils.py:66
python.MpUtils.SubProcessIterator
Definition: MpUtils.py:27
python.MpUtils.SubProcessIterator.work
def work(iterator, master, eoi)
Definition: MpUtils.py:47
python.MpUtils.SubProcessIterator.__init__
def __init__(self, itertask, eoi='__eoi__')
Definition: MpUtils.py:29
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.MpUtils.SubProcessIterator.master
master
Definition: MpUtils.py:36
python.MpUtils.SubProcessIterator.next
next
Definition: MpUtils.py:63
python.MpUtils.SubProcessIterator.__iter__
def __iter__(self)
Definition: MpUtils.py:53
python.MpUtils.SubProcessIterator.started
started
Definition: MpUtils.py:40
python.MpUtils.SubProcessIterator._start
def _start(self)
Definition: MpUtils.py:42
python.MpUtils.SubProcessIterator.end_of_input
end_of_input
Definition: MpUtils.py:37
pickleTool.object
object
Definition: pickleTool.py:30
python.MpUtils.piter
def piter(iterable, eoi=None)
Definition: MpUtils.py:69
python.MpUtils.SubProcessIterator.process
process
Definition: MpUtils.py:39