您的位置:首页 > 编程语言 > Python开发

理解Python的PoolExecutor

2017-04-06 16:23 148 查看
Demo代码和引用知识点都参考自《理解Python并发编程一篇就够了|PoolExecutor篇》–董伟明或作者个人公众号Python之美, 《Python Cookbook》和Python并发编程之线程池/进程池

ThreadPoolExecutor
ProcessPoolExecutor
分别对
threading
multiprocessing
进行了高级抽象,暴露出简单的统一接口。

通过
ProcessPoolExecutor
来做示例。

主要提供两个方法
map()
submit()


map()
方法主要用来针对简化执行相同的方法,如下例:

# -*- coding: utf-8 -*-

from concurrent.futures import ProcessPoolExecutor

def fib(n, test_arg):
if n > 30:
raise Exception('can not > 30, now %s' % n)
if n <= 2:
return 1
return fib(n-1, test_arg) + fib(n-2, test_arg)

def use_map():
nums = [random.randint(0, 33) for _ in range(0, 10)]
with ProcessPoolExecutor() as executor:
try:
results = executor.map(fib, nums, nums)
for num, result in zip(nums, results):
print('fib(%s) result is %s.' % (num, result))
except Exception as e:
print(e)


执行上例,输出如下,当
num
为30时抛出异常捕获后程序停止运行。

...
fib(19) result is 4181.
fib(11) result is 89.
fib(2) result is 1.
fib(5) result is 5.
fib(24) result is 46368.
fib(2) result is 1.
can not > 30, now 33


使用
submit()
方法。

# -*- coding: utf-8 -*-

from concurrent.futures import ProcessPoolExecutor, as_completed
import random

def fib(n, test_arg):
if n > 30:
raise Exception('can not > 30, now %s' % n)
if n <= 2:
return 1
return fib(n-1, test_arg) + fib(n-2, test_arg)

def use_submit():
nums = [random.randint(0, 33) for _ in range(0, 10)]
with ProcessPoolExecutor() as executor:
futures = {executor.submit(fib, n, n): n for n in nums}
for f in as_completed(futures):
try:
print('fib(%s) result is %s.' % (futures[f], f.result()))
except Exception as e:
print(e)


执行上例,输出如下,可见当抛出异常并捕获后,继续向后输出,并没有向
map()
一样停止,除了
as_completed()
,还有
wait()
等方法。

fib(3) result is 2.
fib(15) result is 610.
can not > 30, now 31
fib(23) result is 28657.
fib(1) result is 1.
can not > 30, now 32
fib(14) result is 377.
fib(12) result is 144.
fib(26) result is 121393.
fib(29) result is 514229.


try/except
的代码块包括
as_completed()
则不会继续输出,直接停止,暂时未找到原因。

def use_submit():
nums = [random.randint(0, 33) for _ in range(0, 10)]
with ProcessPoolExecutor() as executor:
futures = {executor.submit(fib, n, n): n for n in nums}
try:
for f in as_completed(futures):
print('fib(%s) result is %s.' % (futures[f], f.result()))
except Exception as e:
print(e)


其他:

1.
map()
是根据传入的参数然后顺序输出的,
as_completed()
是按完成时间输出的,上面的例子不明显,可以参考Python并发编程之线程池/进程池,但都跟
max_workers
参数和方法执行时间挂钩。

import time
def test_sleep(n):
time.sleep(n)
return True
def use_submit():
nums = [3, 2, 1, 3]
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(test_sleep, n): n for n in nums}
for f in as_completed(futures):
try:
print('%s result is %s.' % (futures[f], f.result()))
except Exception as e:
print(e)
def use_map():
nums = [3, 2, 1, 3]
with ProcessPoolExecutor(max_workers=3) as executor:
try:
results = executor.map(test_sleep, nums)
for num, result in zip(nums, results):
print('%s result is %s.' % (num, result))
except Exception as e:
print(e)


use_submit()
输出如下,耗时3+1=4s,且完成一个输出一个,指定
max_workers=3
,第一个耗时1s的完成后就会执行第四个耗时3s的任务。

1 result is True.
2 result is True.
3 result is True.
3 result is True.


use_map()
输出如下,同样是耗时3+1=4s,但是是按传入参数顺序输入,因为指定
max_workers=3
,所以前三个是在耗时3s后一起输出的,第四个在耗时4s后再输出。

3 result is True.
2 result is True.
1 result is True.
3 result is True.


阅读部分
map()
源码。

def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).

Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.

Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()

fs = [self.submit(fn, *args) for args in zip(*iterables)]

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()


fs
存放了
submit()
后返回的
future
实例,是按传入的参数顺序排序的,返回了
result_iterator()
。至于为什么会按
max_workers
数一组返回输出,暂时不清楚。

3.
as_completed()
源码,理解略有困难。

4.
ProcessExecutorPool()
的实现:



我们结合源码和上面的数据流分析一下:

executor.map会创建多个_WorkItem对象(ps. 实际上是执行了多次submit()),每个对象都传入了新创建的一个Future对象。

把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。

创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:

从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。

从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。

简单分析
submit()


def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)

self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)

self._start_queue_management_thread()
return f


创建
Future()
实例
f
,和
_WorkItem()
实例
w


_pending_work_items
即上述所说的
Work Items
字典,key为
_queue_count
,初始化为0;value为
w
。并将
_queue_count
添加到
_work_ids
队列中。

Wake up queue management thread
,即唤醒上述图中的
Local Work Thread


def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p.start()
self._processes[p.pid] = p


_adjust_process_count()
开启
max_wokers
个进程,执行
_process_worker()


开启
_queue_management_thread()
线程,即Local Worker Thread。

_queue_management_thread()
线程中将调用
_add_call_item_to_queue()
_CallItem
置于
call_queue
,并删除引用等操作,该方法理解有困难。

def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.

Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))


执行任务进程,从
call_queue
中获取
_CallItem
并调用其
fn
,将结果放进
result_queue
中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: