-
Notifications
You must be signed in to change notification settings - Fork 10
/
lib_multiproc.py
112 lines (94 loc) · 3.32 KB
/
lib_multiproc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 - Francesco de Gasperin
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# Parallelizator
# USAGE:
# from lib_multiproc import multiprocManager
# def funct(funct_param, outQueue=None):
# pass
# outQueue.put([funct_output])
#
# # start processes for multi-thread
# mpm = multiprocManager(ncpu, funct)
# mpm.put([funct_params])
# mpm.wait()
# for r in mpm.get():
# print "funct_output:", r
import sys
import logging
import multiprocessing
class multiprocManager(object):
class multiThread(multiprocessing.Process):
"""
This class is a working thread which load parameters from a queue and
return in the output queue
"""
def __init__(self, inQueue, outQueue, funct):
multiprocessing.Process.__init__(self)
self.inQueue = inQueue
self.outQueue = outQueue
self.funct = funct
def run(self):
while True:
parms = self.inQueue.get()
# poison pill
if parms is None:
self.inQueue.task_done()
break
self.funct(*parms, outQueue=self.outQueue)
self.inQueue.task_done()
def __init__(self, procs=1, funct=None):
"""
Manager for multiprocessing
procs: number of processors
funct: function to parallelize / note that the last parameter of this function must be the outQueue
and it will be linked to the output queue
"""
self.procs = procs
self._threads = []
self.inQueue = multiprocessing.JoinableQueue()
self.outQueue = multiprocessing.Queue()
self.runs = 0
logging.debug('Spawning %i threads...' % self.procs)
for proc in range(self.procs):
t = self.multiThread(self.inQueue, self.outQueue, funct)
self._threads.append(t)
t.start()
def put(self, args):
"""
Parameters to give to the next jobs sent into queue
"""
self.inQueue.put(args)
self.runs += 1
def get(self):
"""
Return all the results as an iterator
"""
# NOTE: do not use queue.empty() check which is unreliable
# https://docs.python.org/2/library/multiprocessing.html
for run in range(self.runs):
yield self.outQueue.get()
def wait(self):
"""
Send poison pills to jobs and wait for them to finish
The join() should kill all the processes
"""
for t in self._threads:
self.inQueue.put(None)
# wait for all jobs to finish
self.inQueue.join()