理解Python并发编程-PoolExecutor篇
2017-06-20 00:00
232 查看
摘要:之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。
之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让我们不关注这些细节,轻装上阵呢?
答案是:有的。
从Python3.2开始一个叫做concurrent.futures被纳入了标准库,而在Python2它属于第三方的futures库,需要手动安装:
感受下是不是很轻便呢?看一下花费的时间:
除了用map,另外一个常用的方法是submit。如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit:
执一下:
可以看到,第一次捕捉到了异常,但是第二次执行的时候错误直接抛出来了。
上面说到的map,有些同学马上会说,这不是进程(线程)池的效果吗?看起来确实是的:
好像代码量更小哟。好吧,看一下花费的时间:
WhatTF竟然花费了1.7倍的时间。为什么?
BTW,有兴趣的同学可以对比下ThreadPool和ThreadPoolExecutor,由于GIL的缘故,对比的差距一定会更多。
我们结合源码和上面的数据流分析一下:
executor.map会创建多个_WorkItem对象,每个对象都传入了新创建的一个Future对象。
把每个_WorkItem对象然后放进一个叫做「WorkItems」的dict中,键是不同的「WorkIds」。
创建一个管理「WorkIds」队列的线程「Localworkerthread」,它能做2件事:
从「WorkIds」队列中获取WorkId,通过「WorkItems」找到对应的_WorkItem。如果这个Item被取消了,就从「WorkItems」里面把它删掉,否则重新打包成一个_CallItem放入「CallQ」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「ResultQ」队列中。
从「ResultQ」队列中获取_ResultItems,然后从「WorkItems」更新对应的Future对象并删掉入口。
看起来就是一个「生产者/消费者」模型罢了,错了。我们要注意,整个过程并不是多个进程与任务+结果-2个队列直接通信的,而是通过一个中间的「Localworkerthread」,它就是让效率提升的重要原因之一!!!
设想,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。
一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步,而原先等待返回的时间段,由于「Localworkerthread」的存在,这个时候可以完成其他工作
在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客使用tornado让你的请求异步非阻塞,最后也提到了用concurrent.futures实现异步非阻塞的完成耗时任务。
用云栖社区APP,舒服~
原文链接
之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让我们不关注这些细节,轻装上阵呢?
答案是:有的。
从Python3.2开始一个叫做concurrent.futures被纳入了标准库,而在Python2它属于第三方的futures库,需要手动安装:
pipinstallfutures ``` 这个模块中有2个类:ThreadPoolExecutor和ProcessPoolExecutor,也就是对threading和multiprocessing的进行了高级别的抽象, 暴露出统一的接口,帮助开发者非常方便的实现异步调用: ```python importtime fromconcurrent.futuresimportProcessPoolExecutor,as_completed NUMBERS=range(25,38) deffib(n): ifn<=2: return1 returnfib(n-1)+fib(n-2) start=time.time() withProcessPoolExecutor(max_workers=3)asexecutor: fornum,resultinzip(NUMBERS,executor.map(fib,NUMBERS)): print'fib({})={}'.format(num,result) print'COST:{}'.format(time.time()-start)
感受下是不是很轻便呢?看一下花费的时间:
pythonfib_executor.py fib(25)=75025 fib(26)=121393 fib(27)=196418 fib(28)=317811 fib(29)=514229 fib(30)=832040 fib(31)=1346269 fib(32)=2178309 fib(33)=3524578 fib(34)=5702887 fib(35)=9227465 fib(36)=14930352 fib(37)=24157817 COST:10.8920350075
除了用map,另外一个常用的方法是submit。如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit:
fromconcurrent.futuresimportThreadPoolExecutor,as_completed NUMBERS=range(30,35) deffib(n): ifn==34: raiseException("Don'tdothis") ifn<=2: return1 returnfib(n-1)+fib(n-2) withThreadPoolExecutor(max_workers=3)asexecutor: future_to_num={executor.submit(fib,num):numfornuminNUMBERS} forfutureinas_completed(future_to_num): num=future_to_num[future] try: result=future.result() exceptExceptionase: print'raiseanexception:{}'.format(e) else: print'fib({})={}'.format(num,result) withThreadPoolExecutor(max_workers=3)asexecutor: fornum,resultinzip(NUMBERS,executor.map(fib,NUMBERS)): print'fib({})={}'.format(num,result)
执一下:
pythonfib_executor_with_raise.py fib(30)=832040 fib(31)=1346269 raiseanexception:Don'tdothis fib(32)=2178309 fib(33)=3524578 Traceback(mostrecentcalllast): File"fib_executor_with_raise.py",line28,in<module> fornum,resultinzip(NUMBERS,executor.map(fib,NUMBERS)): File"/Library/Python/2.7/site-packages/concurrent/futures/_base.py",line580,inmap yieldfuture.result() File"/Library/Python/2.7/site-packages/concurrent/futures/_base.py",line400,inresult returnself.__get_result() File"/Library/Python/2.7/site-packages/concurrent/futures/_base.py",line359,in__get_result reraise(self._exception,self._traceback) File"/Library/Python/2.7/site-packages/concurrent/futures/_compat.py",line107,inreraise exec('raiseexc_type,exc_value,traceback',{},locals_) File"/Library/Python/2.7/site-packages/concurrent/futures/thread.py",line61,inrun result=self.fn(*self.args,**self.kwargs) File"fib_executor_with_raise.py",line9,infib raiseException("Don'tdothis") Exception:Don'tdothis
可以看到,第一次捕捉到了异常,但是第二次执行的时候错误直接抛出来了。
上面说到的map,有些同学马上会说,这不是进程(线程)池的效果吗?看起来确实是的:
importtime frommultiprocessing.poolimportPool NUMBERS=range(25,38) deffib(n): ifn<=2: return1 returnfib(n-1)+fib(n-2) start=time.time() pool=Pool(3) results=pool.map(fib,NUMBERS) fornum,resultinzip(NUMBERS,pool.map(fib,NUMBERS)): print'fib({})={}'.format(num,result) print'COST:{}'.format(time.time()-start)
好像代码量更小哟。好吧,看一下花费的时间:
pythonfib_pool.py |
BTW,有兴趣的同学可以对比下ThreadPool和ThreadPoolExecutor,由于GIL的缘故,对比的差距一定会更多。
原理
我们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:|=======================In-process=====================|==Out-of-process==|
+----------++----------++--------++-----------++---------+
||=>|WorkIds|=>||=>|CallQ|=>||
||+----------+||+-----------+||
|||...||||...|||
|||6||||5,call()|||
|||7||||...|||
|Process||...||Local|+-----------+|Process|
|Pool|+----------+|Worker||#1..n|
|Executor||Thread|||
||+-----------+||+-----------+||
||<=>|WorkItems|<=>||<=|ResultQ|<=||
||+------------+||+-----------+||
|||6:call()||||...|||
|||future||||4,result|||
|||...||||3,except|||
+----------++------------++--------++-----------++---------+
我们结合源码和上面的数据流分析一下:
executor.map会创建多个_WorkItem对象,每个对象都传入了新创建的一个Future对象。
把每个_WorkItem对象然后放进一个叫做「WorkItems」的dict中,键是不同的「WorkIds」。
创建一个管理「WorkIds」队列的线程「Localworkerthread」,它能做2件事:
从「WorkIds」队列中获取WorkId,通过「WorkItems」找到对应的_WorkItem。如果这个Item被取消了,就从「WorkItems」里面把它删掉,否则重新打包成一个_CallItem放入「CallQ」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「ResultQ」队列中。
从「ResultQ」队列中获取_ResultItems,然后从「WorkItems」更新对应的Future对象并删掉入口。
看起来就是一个「生产者/消费者」模型罢了,错了。我们要注意,整个过程并不是多个进程与任务+结果-2个队列直接通信的,而是通过一个中间的「Localworkerthread」,它就是让效率提升的重要原因之一!!!
设想,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。
Future
Future是常见的一种并发设计模式,在多个其他语言中都可以见到这种解决方案。一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步,而原先等待返回的时间段,由于「Localworkerthread」的存在,这个时候可以完成其他工作
在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客
用云栖社区APP,舒服~
相关文章推荐
- 理解Python并发编程-PoolExecutor篇
- 理解Python并发编程-PoolExecutor篇
- 理解Python并发编程-PoolExecutor篇
- 理解Python并发编程-PoolExecutor篇
- 理解Python并发编程一篇就够了|线程篇
- 理解Python并发编程 - 线程篇
- 理解Python并发编程一篇就够了 | 进程篇
- 理解python并发编程-进程篇
- Python并发编程eventlet
- 使用Python进行并发编程
- 并发编程之join关键字的理解
- Python学习-并发编程之多进程
- Python并发编程之创建多线程的几种方法(二)
- python之并发编程
- python并发编程之多线程2------------死锁与递归锁,信号量等
- python的并发和异步编程实例
- Python并发编程
- python并发编程之多进程理论部分
- 用python理解web并发模型
- python并发编程之IO模型