ATLAS Offline Software
Loading...
Searching...
No Matches
Tools
PyUtils
python
MpUtils.py
Go to the documentation of this file.
1
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
2
3
# @file: PyUtils/MpUtils.py
4
# @purpose: a set of little tools for multiprocessing
5
# stolen from ClePy
6
# http://pypi.python.org/pypi/clepy/0.1
7
# MIT
8
9
def
chunkify
(s, chunksize):
10
11
"""
12
Yield sequence s in chunks of size chunksize.
13
14
>>> list(chunkify('abcdefg', 2))
15
['ab', 'cd', 'ef', 'g']
16
17
>>> list(chunkify('abcdefg', 99))
18
['abcdefg']
19
20
"""
21
22
for
i
in
range(0, len(s), chunksize):
23
yield
s[i:i+chunksize]
24
25
from
multiprocessing
import
Pipe, Process
26
27
class
SubProcessIterator
(
object
):
28
"""Instances of this class process iterators in separate processes."""
29
def
__init__
(self, itertask, eoi='__eoi__'):
30
"""Create a new subprocess iterator.
31
32
itertask : some iterable task to execute in a subprocess
33
eoi : an end-of-iteration marker - returned from the subprocess
34
to signal that iteration is complete.
35
"""
36
self.
client
, self.
master
= Pipe()
37
self.
end_of_input
= eoi
38
pargs = [itertask, self.
master
, eoi]
39
self.
process
= Process(target=self.
work
, args=pargs)
40
self.
started
=
False
41
42
def
_start
(self):
43
self.
started
=
True
44
self.
process
.start()
45
46
@staticmethod
47
def
work
(iterator, master, eoi):
48
"""The actual callable that is executed in the subprocess."""
49
for
chunk
in
iterator:
50
master.send(chunk)
51
master.send(eoi)
52
53
def
__iter__
(self):
54
if
not
self.
started
:
55
self.
_start
()
56
return
self
57
58
def
next
(self):
59
item = self.
client
.recv()
60
if
item != self.
end_of_input
:
61
return
item
62
else
:
63
self.
next
= self.
_empty
64
raise
StopIteration
65
66
def
_empty
(self, *args, **params):
67
raise
StopIteration
68
69
def
piter
(iterable, eoi=None):
70
"""Create a new subprocess iterator.
71
72
iterable : some iterable task to execute in a subprocess
73
eoi : an end-of-iteration marker - returned from the subprocess
74
to signal that iteration is complete.
75
"""
76
return
SubProcessIterator
(iterable, eoi=eoi)
77
python.MpUtils.SubProcessIterator
Definition
MpUtils.py:27
python.MpUtils.SubProcessIterator._start
_start(self)
Definition
MpUtils.py:42
python.MpUtils.SubProcessIterator.__init__
__init__(self, itertask, eoi='__eoi__')
Definition
MpUtils.py:29
python.MpUtils.SubProcessIterator._empty
_empty(self, *args, **params)
Definition
MpUtils.py:66
python.MpUtils.SubProcessIterator.started
bool started
Definition
MpUtils.py:40
python.MpUtils.SubProcessIterator.__iter__
__iter__(self)
Definition
MpUtils.py:53
python.MpUtils.SubProcessIterator.process
process
Definition
MpUtils.py:39
python.MpUtils.SubProcessIterator.end_of_input
end_of_input
Definition
MpUtils.py:37
python.MpUtils.SubProcessIterator.client
client
Definition
MpUtils.py:36
python.MpUtils.SubProcessIterator.next
next
Definition
MpUtils.py:63
python.MpUtils.SubProcessIterator.work
work(iterator, master, eoi)
Definition
MpUtils.py:47
python.MpUtils.SubProcessIterator.master
master
Definition
MpUtils.py:36
python.MpUtils.piter
piter(iterable, eoi=None)
Definition
MpUtils.py:69
python.MpUtils.chunkify
chunkify(s, chunksize)
Definition
MpUtils.py:9
object
Generated on
for ATLAS Offline Software by
1.14.0