Python并发编程
2016-02-29 19:00
851 查看
本文只是介绍Python并发编程的各种实现, 关于并发问题更深入讨论可以参考进程与线程.
进程是正在运行程序的抽象, 每个进程中可以包含多个线程. 进程拥有独立的内存空间, 同进程下的线程共享内存空间.
在单核系统上, 进程或者线程通过交替执行来实现并发, 但并没有真正地并行.
在多核系统的每个核心上, 线程同样采用交替执行的方式. 但因为多核的存在, 不同核心上的线程可以真正的并行.
协程是在单线程中, 通过控制不同任务执行顺序进行协作的机制.协程实现多任务的协作,而非多任务的并发执行.
异步调用将耗时操作交由他人完成保证自己不被阻塞, 并行仍由多线程或内核的IO机制实现.
在创建线程对象时,将函数对象与target参数绑定作为新进程的主函数.调用进程对象的start方法,start方法将创建新进程并调用其target函数开始运行.
调用进程对象pro的join()方法,将使调用进程(主进程)等待进程pro运行完成后继续运行.
一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID.
Python的os模块封装了fork调用,可以在Python程序中轻松创建子进程.由于Windows没有fork调用,该方法无法在Windows上运行.
subprocess.call方法接受一个列表作为参数,并将其作为命令行发送给OS并返回外部进程的返回值.上述调用,等价于在终端中输入
Pipe()返回一对相互连接的
Queue是python中的队列,为了在进程通信中使用一般采用其派生类JoinableQueue.
通常情况下可以使用高级模块threading,其语法与Process类似:
输出:
每个进程至少拥有一个线程,Python主进程的默认线程称为MainThread,threading模块的
对一个对象的修改一般是由多条机器指令完成的,多线程模式下各线程的指令交替执行,可能出现一个线程对对象的修改没有完成,而另一个线程却在此时访问该对象造成错误.
为了防止这种错误, 可以使用锁机制:
获取锁之后必须释放,否则会造成死锁.
当线程试图调用vary()方法时, 会先检查该线程是否获得了锁. 若线程获得了锁则继续执行, 否则线程将会被挂起直到得到锁才会继续执行.
线程只有在获得GIL之后才可以执行. CPython会计算当前线程已执行的字节码数量(opcode计数),达到阈值后就强制释放GIL,同时触发一次线程切换.
这种模式在只有一个CPU核心的情况下毫无问题.任何一个线程被唤起时都能成功获得到GIL(因为只有释放了GIL才会引发线程调度).
因为GIL释放和获取间隔较短而唤醒线程间隔较长,在多核情况下,原来持有GIL的线程很可能再次获得GIL,运行在其它核心上的线程唤醒后却得不到GIL,白白浪费了CPU.
GIL使得CPython多线程程序效率下降严重但却确保了单线程程序的执行效率.如果确实需要多任务处理(如IO密集任务)可以考虑多进程与异步调用,如果需要多核的计算能力优势可以选择其它语言.
yield关键字是协程的核心功能, yield将一个值返回主调用函数, 然后下次调用时从yield下面的一条语句继续执行, 整个过程中函数作用域内的数据不变.
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待.如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产.
输出:
注意到consumer函数是一个generator,把一个consumer传入produce后:
首先调用c.send(None)启动生成器;
producer生产一个产品, 并调用c.send(n)把实参值赋给yield的左值n1并从下一行开始继续执行consumer.
consumer通过yield拿到消息,处理,然后通过yield把结果传回;
produce拿到consumer处理的结果,继续生产下一条消息;
produce决定停止生产,通过c.close()关闭consumer,整个过程结束.
整个流程无锁,由一个线程执行,producer和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务.
asyncio内建一个事件循环(eventloop),事件循环监听事件, 并将事件发送给响应函数进行处理.
使用async关键字声明一个响应函数为异步函数, 当响应函数遇到耗时操作时, 通过await关键字通知事件循环.
事件循环暂停该事件的处理, 继续处理其它事件.当耗时操作完成后, 事件循环将继续执行刚才被挂起的异步函数.
耗时操作交由异步API的提供者在另外的线程中完成,主线程并未执行耗时操作, 使得主线程可以正常响应其它事件不至阻塞.
输出:
上述示例中包含两个耗时操作
本示例中事件循环和异步函数在同一个线程内执行即可避免阻塞.当然也可以在不同线程执行,异步机制将保证异步函数线程不被阻塞.
asyncio库可以在其它线程中完成耗时操作, 更可能的解决方案是采用操作系统内置的异步支持.
异步调用将耗时操作交由他人完成保证自己不被阻塞, 只是线程内部的一种策略.并发的实现仍由操作系统或者运行时环境考虑.
进程是正在运行程序的抽象, 每个进程中可以包含多个线程. 进程拥有独立的内存空间, 同进程下的线程共享内存空间.
在单核系统上, 进程或者线程通过交替执行来实现并发, 但并没有真正地并行.
在多核系统的每个核心上, 线程同样采用交替执行的方式. 但因为多核的存在, 不同核心上的线程可以真正的并行.
协程是在单线程中, 通过控制不同任务执行顺序进行协作的机制.协程实现多任务的协作,而非多任务的并发执行.
异步调用将耗时操作交由他人完成保证自己不被阻塞, 并行仍由多线程或内核的IO机制实现.
进程
multiprocessing模块提供了一个Process类来代表一个进程对象:from multiprocessing import Process import os def run(name): print('sub process %s : %d' % (name, os.getpid()) ) if __name__ == '__main__': pro = Process(target = run, args = ('name',)) pro.start() pro.join() print('main process: %d' % os.getpid() )
在创建线程对象时,将函数对象与target参数绑定作为新进程的主函数.调用进程对象的start方法,start方法将创建新进程并调用其target函数开始运行.
调用进程对象pro的join()方法,将使调用进程(主进程)等待进程pro运行完成后继续运行.
进程池
进程池Pool可以批量管理进程,Pool的构造函数接受一个int值作为最大进程数,默认为计算机逻辑核心数.from multiprocessing import Process from multiprocessing import Pool import os def run(): print('sub process: %d' % os.getpid() ) if __name__ == '__main__': pool = Pool(4); for i in range(0,4): pool.apply_async(run) #向进程池中添加函数对象作为子进程的主函数 pool.close() #close()之后进程池中不能添加新的进程 #只有close()之后才能调用join()方法 pool.join() #join()使得在进程池中所有进程都执行完毕后,主调进程继续执行 print('process: %d' % os.getpid() )
fork
fork命令是UNIX系列操作系统中的一个重要命令, 用它在当前进程下创建一个子进程作为当前进程的副本.若成功则父进程返回子进程pid, 子进程返回0.一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID.
Python的os模块封装了fork调用,可以在Python程序中轻松创建子进程.由于Windows没有fork调用,该方法无法在Windows上运行.
subprocess
subprocess模块用于调用外部进程:import subprocess r = subprocess.call(['ping','www.python.org'])
subprocess.call方法接受一个列表作为参数,并将其作为命令行发送给OS并返回外部进程的返回值.上述调用,等价于在终端中输入
ping www.python.org.
进程间通信
Python提供了多种进程间通信机制,最常用的是Queue和Pipe:from multiprocessing import Process,Pipe import os def fun(conn): conn.send(['msg:',os.getpid()]); conn.close(); if __name__ == '__main__': (sendConn,recvConn) = Pipe(); pro = Process(target=fun,args=(sendConn,)) pro.start(); pro.join(); print(recvConn.recv());
Pipe()返回一对相互连接的
read-write PipeConnection对象,每个对象都具有send()和recv()方法,一个对象通过send()发送字符串,与其关联的对象使用recv()方法可以获得该消息.
Queue是python中的队列,为了在进程通信中使用一般采用其派生类JoinableQueue.
import multiprocessing q = multiprocessing.Queue() def readerProc(): print(q.get()) if __name__ == '__main__': reader = multiprocessing.Process(target=readerProc) reader.start() q.put(100) reader.join()
线程
Python标准库提供了_thread模块作为线程的底层实现,并提供了高级模块
threading对
_thread进行了封装.
通常情况下可以使用高级模块threading,其语法与Process类似:
import threading def run(): print('current thread:%s' % threading.current_thread().name); t = threading.Thread(target=run,name='new_thread'); t.start(); t.join(); print('current thread:%s' % threading.current_thread().name);
输出:
current thread:new_thread current thread:MainThread
每个进程至少拥有一个线程,Python主进程的默认线程称为MainThread,threading模块的
current_thread()方法将返回当前线程对象.
Mutex
线程与进程的区别在于进程拥有自己的内存空间,而同一个进程下的线程需要共享内存空间.对一个对象的修改一般是由多条机器指令完成的,多线程模式下各线程的指令交替执行,可能出现一个线程对对象的修改没有完成,而另一个线程却在此时访问该对象造成错误.
为了防止这种错误, 可以使用锁机制:
import threading shared = 0; lock = threading.Lock(); def vary(): global shared # Declare "shared" is a global object instead of a local object shared = shared + 1; def run(): lock.acquire(); try: vary(); print('new_thread %d\n' % shared); except Exception, e: raise finally: lock.release(); t1 = threading.Thread(target=run,name='new_thread',); t1.start(); t1.join(); print('MainThread %d' % shared);
获取锁之后必须释放,否则会造成死锁.
当线程试图调用vary()方法时, 会先检查该线程是否获得了锁. 若线程获得了锁则继续执行, 否则线程将会被挂起直到得到锁才会继续执行.
ThreadLocal
ThreadLocal机制使得线程可以建立自己的局部变量不与其它线程共享,ThreadLocal对象是全局的(线程共享),其每一个属性都是一个词典以线程ID作为key进行取值.import threading def run(name): local_name.name = name; print('Hello, %s' % local_name.name); local_name = threading.local(); t1 = threading.Thread(target=run,args=('World',)); t2 = threading.Thread(target=run,args=('Home',)); t1.start(); t2.start();
GIL
GIL(Global Interpreter Lock)是CPython的历史遗留问题. GIL的存在使得CPython上的多线程程序基本无法并行,不能真正利用多核的计算能力.线程只有在获得GIL之后才可以执行. CPython会计算当前线程已执行的字节码数量(opcode计数),达到阈值后就强制释放GIL,同时触发一次线程切换.
这种模式在只有一个CPU核心的情况下毫无问题.任何一个线程被唤起时都能成功获得到GIL(因为只有释放了GIL才会引发线程调度).
因为GIL释放和获取间隔较短而唤醒线程间隔较长,在多核情况下,原来持有GIL的线程很可能再次获得GIL,运行在其它核心上的线程唤醒后却得不到GIL,白白浪费了CPU.
GIL使得CPython多线程程序效率下降严重但却确保了单线程程序的执行效率.如果确实需要多任务处理(如IO密集任务)可以考虑多进程与异步调用,如果需要多核的计算能力优势可以选择其它语言.
协程
协程(Coroutine),又称微线程,纤程.多线(进)程模型下, 各线程执行的先后顺序是不可知的, 协程可以控制它们之间执行的顺序实现可控的写作.yield关键字是协程的核心功能, yield将一个值返回主调用函数, 然后下次调用时从yield下面的一条语句继续执行, 整个过程中函数作用域内的数据不变.
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待.如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产.
def consumer(): r = '' while True: n = yield r if not n: return print('Consuming %s' % n) r = 'OK' def producer(c): c.send(None) n = 0 while n < 5: n = n + 1 print('Producing %s' % n) r = c.send(n) print('Consumer return: %s' % r) c.close() c = consumer() producer(c)
输出:
Producing 1 Consuming 1 Consumer return: OK Producing 2 Consuming 2 Consumer return: OK Producing 3 Consuming 3 Consumer return: OK Producing 4 Consuming 4 Consumer return: OK Producing 5 Consuming 5 Consumer return: OK
注意到consumer函数是一个generator,把一个consumer传入produce后:
首先调用c.send(None)启动生成器;
producer生产一个产品, 并调用c.send(n)把实参值赋给yield的左值n1并从下一行开始继续执行consumer.
consumer通过yield拿到消息,处理,然后通过yield把结果传回;
produce拿到consumer处理的结果,继续生产下一条消息;
produce决定停止生产,通过c.close()关闭consumer,整个过程结束.
整个流程无锁,由一个线程执行,producer和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务.
异步调用
asyncio是Python3.4中引入的异步调用模块,async/
await是Python3.5中引入的新语法使得可读性更高.
asyncio内建一个事件循环(eventloop),事件循环监听事件, 并将事件发送给响应函数进行处理.
使用async关键字声明一个响应函数为异步函数, 当响应函数遇到耗时操作时, 通过await关键字通知事件循环.
事件循环暂停该事件的处理, 继续处理其它事件.当耗时操作完成后, 事件循环将继续执行刚才被挂起的异步函数.
耗时操作交由异步API的提供者在另外的线程中完成,主线程并未执行耗时操作, 使得主线程可以正常响应其它事件不至阻塞.
#!/usr/local/bin/python3.5 import threading import asyncio async def hello(): print('Hello world! (%s)' % threading.currentThread()) await asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) # start event loop loop.close()
输出:
Hello world! (<_MainThread(MainThread, started 7984)>) Hello world! (<_MainThread(MainThread, started 7984)>) Hello again! (<_MainThread(MainThread, started 7984)>) Hello again! (<_MainThread(MainThread, started 7984)>) ({<Task finished coro=<hello() done, defined at <stdin>:1> result=None>, <Task finished coro=<hello() done, defined at <stdin>:1> result=None>}, set())
上述示例中包含两个耗时操作
[hello(), hello()], 但是耗时操作
sleep(1)异步执行主线程并未被阻塞.
本示例中事件循环和异步函数在同一个线程内执行即可避免阻塞.当然也可以在不同线程执行,异步机制将保证异步函数线程不被阻塞.
asyncio库可以在其它线程中完成耗时操作, 更可能的解决方案是采用操作系统内置的异步支持.
异步调用将耗时操作交由他人完成保证自己不被阻塞, 只是线程内部的一种策略.并发的实现仍由操作系统或者运行时环境考虑.
相关文章推荐
- python ipython notebook教程
- python入门
- python中的lambda表达
- python学习笔记七 初识socket(进阶篇)
- Python——正则表达式(2)
- python成长之路第三篇(2)_正则表达式
- python Json模块
- Python 优化集锦
- Python爬虫扒取2345天气预报网上的邯郸天气数据
- 【python 笔记】 python pdb 调试技巧
- Python基础篇【第7篇】: 面向对象(1)
- Python基础篇【第6篇】: Python模块subprocess
- Python基础篇【第6篇】: Python装饰器
- 17个新手常见Python运行时错误
- [python实现设计模式]-1. 单例模式
- Celery任务调度示例
- Python——正则表达式(1)
- python爬虫提取冰与火之歌五季的种子
- mac python install zlib not available
- python中__getattr__()和__getattribute__()方法的区别