ATLAS Offline Software
Loading...
Searching...
No Matches
MpUtils.py
Go to the documentation of this file.
1# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
2
3# @file: PyUtils/MpUtils.py
4# @purpose: a set of little tools for multiprocessing
5# stolen from ClePy
6# http://pypi.python.org/pypi/clepy/0.1
7# MIT
8
9def chunkify(s, chunksize):
10
11 """
12 Yield sequence s in chunks of size chunksize.
13
14 >>> list(chunkify('abcdefg', 2))
15 ['ab', 'cd', 'ef', 'g']
16
17 >>> list(chunkify('abcdefg', 99))
18 ['abcdefg']
19
20 """
21
22 for i in range(0, len(s), chunksize):
23 yield s[i:i+chunksize]
24
25from multiprocessing import Pipe, Process
26
28 """Instances of this class process iterators in separate processes."""
29 def __init__(self, itertask, eoi='__eoi__'):
30 """Create a new subprocess iterator.
31
32 itertask : some iterable task to execute in a subprocess
33 eoi : an end-of-iteration marker - returned from the subprocess
34 to signal that iteration is complete.
35 """
36 self.client, self.master = Pipe()
37 self.end_of_input = eoi
38 pargs = [itertask, self.master, eoi]
39 self.process = Process(target=self.work, args=pargs)
40 self.started = False
41
42 def _start(self):
43 self.started = True
44 self.process.start()
45
46 @staticmethod
47 def work(iterator, master, eoi):
48 """The actual callable that is executed in the subprocess."""
49 for chunk in iterator:
50 master.send(chunk)
51 master.send(eoi)
52
53 def __iter__(self):
54 if not self.started:
55 self._start()
56 return self
57
58 def next(self):
59 item = self.client.recv()
60 if item != self.end_of_input:
61 return item
62 else:
63 self.next = self._empty
64 raise StopIteration
65
66 def _empty(self, *args, **params):
67 raise StopIteration
68
69def piter(iterable, eoi=None):
70 """Create a new subprocess iterator.
71
72 iterable : some iterable task to execute in a subprocess
73 eoi : an end-of-iteration marker - returned from the subprocess
74 to signal that iteration is complete.
75 """
76 return SubProcessIterator(iterable, eoi=eoi)
77
__init__(self, itertask, eoi='__eoi__')
Definition MpUtils.py:29
_empty(self, *args, **params)
Definition MpUtils.py:66
work(iterator, master, eoi)
Definition MpUtils.py:47
piter(iterable, eoi=None)
Definition MpUtils.py:69
chunkify(s, chunksize)
Definition MpUtils.py:9