您的位置:首页 > 编程语言 > Python开发

Python的multiprocessing模块学习

2016-03-10 11:04 686 查看
1.基本介绍
multiprocessing模块可以衍生出子进程。multiprocessing模块同时提供本地和远程的并发操作。multiprocessing模块不像threading模块那样会受到GIL全局解释器锁的限制,它使用进程代替线程。基于这样的特性,multiprocessing模块可以让程序员在一台服务器上使用多个处理器。
In [106]: from multiprocessing import Pool

In [107]: def f(x):
.....:     return x*x;
.....:

In [108]: if __name__ == '__main__':
.....:     p=Pool(5)
.....:     print(p.map(f,[1,2,3]))
.....:
[1, 4, 9]


在multiprocessing模块中,子进程是通过一个Process对象生成的。然后调用start()函数

In [116]: from multiprocessing import Process

In [117]: def f(name):
.....:     print 'hello',name
.....:

In [118]: if __name__ == '__main__':
.....:     p=Process(target=f,args=('john',))
.....:     p.start()
.....:     p.join()
.....:
hello john


如果要查看各自的进程ID,可以使用以下代码
#!/usr/sbin/python

from multiprocessing import Process
import os

def info(title):
print title
print 'module name:',__name__
if hasattr(os,'getppid'):
print 'parent process:',os.getppid()
print 'process id:', os.getpid()

def f(name):
info('function f')
print 'hello',name

if __name__ == '__main__':
info('main line')
p = Process(target=f,args=('john',))
p.start()
p.join()


main line
module name: __main__
parent process: 17148
process id: 18168
function f
module name: __main__
parent process: 18168
process id: 18169
hello john


2.进程间通信
multiprocessing模块支持Queues和Pipes两种方式来进行进程间通信
使用Queue

In [123]: from multiprocessing import Process,Queue

In [124]: def f(q):
.....:     q.put([42,None,'hello'])
.....:

In [125]: if __name__ == '__main__':
.....:     q=Queue()
.....:     p=Process(target=f,args=(q,))
.....:     p.start()
.....:     print q.get()
.....:     p.join()
.....:
[42, None, 'hello']


使用Queues,对于线程和进程来说都是安全的

使用Pipe
In [136]: from multiprocessing import Process,Pipe

In [137]: def f(conn):
.....:     conn.send([42,None,'hello'])
.....:     conn.close()
.....:

In [138]: if __name__ == '__main__':
.....:     parent_conn,child_conn=Pipe()
.....:     p=Process(target=f,args=(child_conn,))
.....:     p.start()
.....:     print parent_conn.recv()
.....:     p.join()
.....:
[42, None, 'hello']


Pipe()返回一对连接对象,这两个连接对象分别代表Pipe的两端。每个连接对象都有send()和recv()方法。需要注意的是如果两个不同的进程在同一时间对同一个Pipe的末端或者连接对象进行读写操作,那么Pipe中的数据可能被损坏。不同的进程在不同的末端同一时间读写数据不会造成数据损坏。

3.进程间同步
In [143]: from multiprocessing import Process,Lock

In [144]: def f(l,i):
l.acquire()
print 'hello world',i
l.release()
.....:

In [145]: if __name__ == '__main__':
lock=Lock()
for num in range(10):
Process(target=f,args=(lock,num)).start()
.....:
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9


4.进程间共享状态信息
在进行并发编程的过程中,尽量不要使用共享状态。如果一定要在进程间共享数据,multiprocessing模块提供了一些方法。

共享内存
In [11]: from multiprocessing import Process,Value,Array

In [12]: def f(n,a):
....:     n.value=3.1415927
....:     for i in range(len(a)):
....:         a[i] = -a[i]
....:

In [13]: if __name__ == '__main__':
....:     num=Value('d',0.0)
....:     arr=Array('i',range(10))
....:     p=Process(target=f,args=(num,arr))
....:     p.start()
....:     p.join()
....:     print num.value
....:     print arr[:]
....:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


共享进程
In [27]: from multiprocessing import Process,Manager

In [28]: def f(d,l):
d[1] = '1'
d['2']=2
d[0.25]=None
l.reverse()
....:

In [29]: if __name__ == '__main__':
manager=Manager()
d=manager.dict()
l=manager.list(range(10))
p=Process(target=f,args=(d,l))
p.start()
p.join()
print d
print l
....:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


5.使用一组工作进程
使用Pool对象会创建一组worker进程
from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x):
return x*xif __name__ == '__main__':
pool = Pool(processes=4)              # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))

# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i

# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,))      # runs in *only* one process
print res.get(timeout=1)              # prints "400"

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1)              # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
25
36
64
49
81
16
400
27150
[27149, 27152, 27151, 27150]
We lacked patience and got a multiprocessing.TimeoutError


Pool对象的函数只能是创建它的进程可以使用。

multiprocessing模块的提供的函数需要子进程可以导入__main__模块

参考文档:
https://docs.python.org/2/library/multiprocessing.html

本文出自 “Linux SA John” 博客,请务必保留此出处http://john88wang.blog.51cto.com/2165294/1749486
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: