理解Python的PoolExecutor
2017-04-06 16:23
148 查看
Demo代码和引用知识点都参考自《理解Python并发编程一篇就够了|PoolExecutor篇》–董伟明或作者个人公众号Python之美, 《Python Cookbook》和Python并发编程之线程池/进程池。
通过
主要提供两个方法
执行上例,输出如下,当
使用
执行上例,输出如下,可见当抛出异常并捕获后,继续向后输出,并没有向
但
其他:
1.
阅读部分
3.
4.
我们结合源码和上面的数据流分析一下:
executor.map会创建多个_WorkItem对象(ps. 实际上是执行了多次submit()),每个对象都传入了新创建的一个Future对象。
把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
简单分析
创建
开启
执行任务进程,从
ThreadPoolExecutor和
ProcessPoolExecutor分别对
threading和
multiprocessing进行了高级抽象,暴露出简单的统一接口。
通过
ProcessPoolExecutor来做示例。
主要提供两个方法
map()和
submit()。
map()方法主要用来针对简化执行相同的方法,如下例:
# -*- coding: utf-8 -*- from concurrent.futures import ProcessPoolExecutor def fib(n, test_arg): if n > 30: raise Exception('can not > 30, now %s' % n) if n <= 2: return 1 return fib(n-1, test_arg) + fib(n-2, test_arg) def use_map(): nums = [random.randint(0, 33) for _ in range(0, 10)] with ProcessPoolExecutor() as executor: try: results = executor.map(fib, nums, nums) for num, result in zip(nums, results): print('fib(%s) result is %s.' % (num, result)) except Exception as e: print(e)
执行上例,输出如下,当
num为30时抛出异常捕获后程序停止运行。
... fib(19) result is 4181. fib(11) result is 89. fib(2) result is 1. fib(5) result is 5. fib(24) result is 46368. fib(2) result is 1. can not > 30, now 33
使用
submit()方法。
# -*- coding: utf-8 -*- from concurrent.futures import ProcessPoolExecutor, as_completed import random def fib(n, test_arg): if n > 30: raise Exception('can not > 30, now %s' % n) if n <= 2: return 1 return fib(n-1, test_arg) + fib(n-2, test_arg) def use_submit(): nums = [random.randint(0, 33) for _ in range(0, 10)] with ProcessPoolExecutor() as executor: futures = {executor.submit(fib, n, n): n for n in nums} for f in as_completed(futures): try: print('fib(%s) result is %s.' % (futures[f], f.result())) except Exception as e: print(e)
执行上例,输出如下,可见当抛出异常并捕获后,继续向后输出,并没有向
map()一样停止,除了
as_completed(),还有
wait()等方法。
fib(3) result is 2. fib(15) result is 610. can not > 30, now 31 fib(23) result is 28657. fib(1) result is 1. can not > 30, now 32 fib(14) result is 377. fib(12) result is 144. fib(26) result is 121393. fib(29) result is 514229.
但
try/except的代码块包括
as_completed()则不会继续输出,直接停止,暂时未找到原因。
def use_submit(): nums = [random.randint(0, 33) for _ in range(0, 10)] with ProcessPoolExecutor() as executor: futures = {executor.submit(fib, n, n): n for n in nums} try: for f in as_completed(futures): print('fib(%s) result is %s.' % (futures[f], f.result())) except Exception as e: print(e)
其他:
1.
map()是根据传入的参数然后顺序输出的,
as_completed()是按完成时间输出的,上面的例子不明显,可以参考Python并发编程之线程池/进程池,但都跟
max_workers参数和方法执行时间挂钩。
import time def test_sleep(n): time.sleep(n) return True def use_submit(): nums = [3, 2, 1, 3] with ProcessPoolExecutor(max_workers=3) as executor: futures = {executor.submit(test_sleep, n): n for n in nums} for f in as_completed(futures): try: print('%s result is %s.' % (futures[f], f.result())) except Exception as e: print(e) def use_map(): nums = [3, 2, 1, 3] with ProcessPoolExecutor(max_workers=3) as executor: try: results = executor.map(test_sleep, nums) for num, result in zip(nums, results): print('%s result is %s.' % (num, result)) except Exception as e: print(e)
use_submit()输出如下,耗时3+1=4s,且完成一个输出一个,指定
max_workers=3,第一个耗时1s的完成后就会执行第四个耗时3s的任务。
1 result is True. 2 result is True. 3 result is True. 3 result is True.
use_map()输出如下,同样是耗时3+1=4s,但是是按传入参数顺序输入,因为指定
max_workers=3,所以前三个是在耗时3s后一起输出的,第四个在耗时4s后再输出。
3 result is True. 2 result is True. 1 result is True. 3 result is True.
阅读部分
map()源码。
def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator()
fs存放了
submit()后返回的
future实例,是按传入的参数顺序排序的,返回了
result_iterator()。至于为什么会按
max_workers数一组返回输出,暂时不清楚。
3.
as_completed()源码,理解略有困难。
4.
ProcessExecutorPool()的实现:
我们结合源码和上面的数据流分析一下:
executor.map会创建多个_WorkItem对象(ps. 实际上是执行了多次submit()),每个对象都传入了新创建的一个Future对象。
把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
简单分析
submit()。
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool('A child process terminated ' 'abruptly, the process pool is not usable anymore') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread self._result_queue.put(None) self._start_queue_management_thread() return f
创建
Future()实例
f,和
_WorkItem()实例
w。
_pending_work_items即上述所说的
Work Items字典,key为
_queue_count,初始化为0;value为
w。并将
_queue_count添加到
_work_ids队列中。
Wake up queue management thread,即唤醒上述图中的
Local Work Thread。
def _start_queue_management_thread(self): # When the executor gets lost, the weakref callback will wake up # the queue management thread. def weakref_cb(_, q=self._result_queue): q.put(None) if self._queue_management_thread is None: # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, self._call_queue, self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, self._result_queue)) p.start() self._processes[p.pid] = p
_adjust_process_count()开启
max_wokers个进程,执行
_process_worker()。
开启
_queue_management_thread()线程,即Local Worker Thread。
_queue_management_thread()线程中将调用
_add_call_item_to_queue()将
_CallItem置于
call_queue,并删除引用等操作,该方法理解有困难。
def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. """ while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) result_queue.put(_ResultItem(call_item.work_id, exception=exc)) else: result_queue.put(_ResultItem(call_item.work_id, result=r))
执行任务进程,从
call_queue中获取
_CallItem并调用其
fn,将结果放进
result_queue中。
相关文章推荐
- Python 之小白的系统进程的理解之进程池Pool
- 理解Python并发编程-PoolExecutor篇
- 对Python内存管理的认识(重点usedpool的一个trick的理解)
- 对Python内存管理的认识(重点usedpool的一个trick的理解)http://blog.csdn.net/wangyuquanliuli/article/details/8606072
- python self,cls,decorator的理解
- [Python]五分钟理解元类(Metaclasses)
- 理解 python 的 method 和 function 兼谈 descriptor
- [Python]五分钟理解元类(Metaclasses)
- Python 深入理解yield
- 我对Pool与Cache两个词的理解
- [Python -and or ]Python 中 and or 之 我的理解
- resource pool 理解
- 理解python中的bytestring和unicode
- Thread pool in python
- Java Stringpool 理解!!!
- python内存管理与Memcached内存管理的理解
- Python天天美味(33) - 五分钟理解元类(Metaclasses)[转]
- 理解Python命名机制
- [python-chinese] 我对字符编码的一些理解
- 理解python的修饰符@