快速了解Python并发编程的工程实现(下)
2019-08-21 22:26
1616 查看
关于我
一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。
Github:https://github.com/hylinux1024
微信公众号:终身开发者(angrycode)
0x00 使用进程实现并发
上一篇文章介绍了线程的使用。然而
Python中由于
Global Interpreter Lock(全局解释锁
GIL)的存在,每个线程在在执行时需要获取到这个
GIL,在同一时刻中只有一个线程得到解释锁的执行,
Python中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。
如果要充分利用现代多核
CPU的并发能力,就要使用
multipleprocessing模块了。
0x01 multipleprocessing
与使用线程的
threading模块类似,
multipleprocessing模块提供许多高级
API。最常见的是
Pool对象了,使用它的接口能很方便地写出并发执行的代码。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是将f()方法并发地映射到列表中的每个元素 print(p.map(f, [1, 2, 3])) # 执行结果 # [1, 4, 9]
关于
Pool下文中还会提到,这里我们先来看
Process。
Process
要创建一个进程可以使用
Process类,使用
start()方法启动进程。
from multiprocessing import Process import os def echo(text): # 父进程ID print("Process Parent ID : ", os.getppid()) # 进程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 执行结果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
进程池
正如开篇提到的
multiprocessing模块提供了
Pool类可以很方便地实现一些简单多进程场景。
它主要有以下接口
apply(func[, args[, kwds]])
执行func(args,kwds)
方法,在方法结束返回前会阻塞。apply_async(func[, args[, kwds[, callback[, error_callback]]]])
异步执行func(args,kwds)
,会立即返回一个result
对象,如果指定了callback
参数,结果会通过回调方法返回,还可以指定执行出错的回调方法error_callback()
map(func, iterable[, chunksize])
类似内置函数map()
,可以并发执行func
,是同步方法map_async(func, iterable[, chunksize[, callback[, error_callback]]])
异步版本的map
close()
关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。terminate()
终止进程池join()
等待工作进程执行完,必需先调用close()
或者terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是将f()方法并发地映射到列表中的每个元素 a = p.map(f, [1, 2, 3]) print(a) # 异步执行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一个result对象,代表方法的执行结果 print(b) # 为了拿到结果,使用join方法等待池中工作进程退出 p.close() # 调用join方法前,需先执行close或terminate方法 p.join() # 获取执行结果 print(b.get()) # 执行结果 # [1, 4, 9] # <multiprocessing.pool.MapResult object at 0x10631b710> # [9, 25, 49, 121]
map_async()和
apply_async()执行后会返回一个
class multiprocessing.pool.AsyncResult对象,通过它的
get()可以获取到执行结果,
ready()可以判断
AsyncResult的结果是否准备好。
进程间数据的传输
multiprocessing模块提供了两种方式用于进程间的数据共享:队列(
Queue)和管道(
Pipe)
Queue是线程安全,也是进程安全的。使用
Queue可以实现进程间的数据共享,例如下面的
demo中子进程
put一个对象,在主进程中就能
get到这个对象。
任何可以序列化的对象都可以通过
Queue来传输。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue进行数据通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主进程取得子进程中的数据 print(q.get()) # prints "[42, None, 'hello']" p.join() # 执行结果 # [42, None, 'hello']
Pipe()返回一对通过管道连接的
Connection对象。这两个对象可以理解为管道的两端,它们通过
send()和
recv()发送和接收数据。
from multiprocessing import Process, Pipe def write(conn): # 子进程中发送一个对象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在读的进程中通过recv接收对象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一对连接对象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 执行结果 # [42, None, 'hello']
需要注意的是,两个进程不能同时对一个连接对象进行
send或
recv操作。
同步
我们知道线程间的同步是通过锁机制来实现的,进程也一样。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先执行有锁的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再执行无锁的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有锁的代码将每秒依次打印
hello world 0 hello world 1 hello world 2 hello world 3 hello world 4
如果执行无锁的代码,则在我的电脑上执行结果是这样的
hello worldhello world 0 1 hello world 2 hello world 3 hello world 4
除了
Lock,还包括
RLock、
Condition、
Semaphore和
Event等进程间的同步原语。其用法也与线程间的同步原语很类似。
API使用可以参考文末中引用的文档链接。
在工程中实现进程间的数据共享应当优先使用队列或管道。
0x02 总结
本文对
multiprocessing模块中常见的
API作了简单的介绍。讲述了
Process和
Pool的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。
通过与上篇的对比学习,本文的内容应该是更加容易掌握的。
0x03 引用
- https://python-parallel-programmning-cookbook.readthedocs.io
- https://docs.python.org/3/library/threading.html
- https://docs.python.org/3.7/library/multiprocessing.html
- https://docs.python.org/3/glossary.html#term-global-interpreter-lock
- https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
相关文章推荐
- 一文了解Python并发编程的工程实现方法
- Python Qt GUI快速编程-第四章的digit的python3+pytq5+Qt Designer实现
- Python中的greenlet包实现并发编程的入门教程
- Python Qt GUI快速编程-第五章的例子pen的python3+pytq5+Qt Designer实现
- Python网络编程之socketserver实现多并发
- 简单介绍Python中利用生成器实现的并发编程
- 使用Python中的greenlet包实现并发编程的入门教程
- 【编程练习】快速排序,Java和Python实现
- Python网络编程:使用pexpect实现快速ssh连接
- python并发编程多进程 模拟抢票实现过程
- 简单介绍Python中利用生成器实现的并发编程
- Python Qt GUI快速编程-第四章的Currency例子的python3+pytq5+Qt Designer实现
- Python Qt GUI快速编程-第五章的例子Numbers的python3+pytq5+Qt Designer实现
- Python网络编程之socketserver实现多并发
- python并发编程之多进程(实现)
- Python3快速入门(九)——Python3并发编程
- Python Qt GUI快速编程-第五章的exercise的python3+pytq5+Qt Dersinger实现
- Python Qt GUI快速编程-第四章的计算器python3+pytq5+Qt Designer实现
- Python Qt GUI快速编程-第四章的Exercise例子Interest的python3+pytq5+Qt Designer实现
- python 元类型编程,实现匿名验证器的装饰器AuthenticationDecoratorMeta