您的位置:首页 > 编程语言 > Python开发

Python thread pool similar to the multiprocessing Pool?

2015-07-13 14:39 791 查看
Q:

Is there a Pool class for worker threads, similar to the multiprocessing module's Pool
class?

I like for example the easy way to parallelize a map function
def long_running_func(p):
c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))


however I would like to do it without the overhead of creating new processes.

I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.

Do I have to write my own threading pool?
A:

I just found out that there actually is a thread-based Pool interface in the 
multiprocessing
 module,
however it is hidden somewhat and not properly documented.

It can be imported via
from multiprocessing.pool import ThreadPool


It is implemented using a dummy Process class wrapping a python thread. This thread-based Process class can be found in 
multiprocessing.dummy
 which
is mentioned briefly in the docs. This dummy module
supposedly provides the whole multiprocessing interface based on threads.

A:

In Python 3 you can use 
concurrent.futures.ThreadPoolExecutor
,
i.e.:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)


See the docs for more info and examples.

A:

from Queue import Queue
from threading import Thread

class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()

def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()

class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)

def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))

def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()

if __name__ == '__main__':
from random import randrange
delays = [randrange(1, 10) for i in range(100)]

from time import sleep
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)

# 1) Init a Thread pool with the desired number of threads
pool = ThreadPool(20)

for i, d in enumerate(delays):
# print the percentage of tasks placed in the queue
print '%.2f%c' % ((float(i)/float(len(delays)))*100.0,'%')

# 2) Add the task to the queue
pool.add_task(wait_delay, d)

# 3) Wait for completion
pool.wait_completion()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python