您的位置:首页 > 编程语言 > Python开发

Python学习笔记(二十七)多进程 (进程和线程开始)

2017-08-19 15:50 826 查看
摘抄:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。

Unix/Linux操作系统提供了一个
fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是
fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回
0
,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用
getppid()
就可以拿到父进程的ID。


Python的
os
模块封装了常见的系统调用,其中就包括
fork
,可以在Python程序中轻松创建子进程

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))


Process (23193) start...
I (23193) just created a child process (23194).
I am child process (23194) and my parent is 23194.


multiprocessing

multiprocessing
模块就是跨平台版本的多进程模块。

multiprocessing
模块提供了一个
Process
来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')


执行结果如下:

Parent process 5184.
Child process will start.
Run child process test1 (5280)...
Child process end


创建子进程时,只需要传入一个执行函数函数的参数,创建一个
Process
实例
,用
start()
方法
启动,这样创建进程比
fork()
还要简单。

join()
方法可以等待子进程结束后再继续往下运行通常用于进程间的同步

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')


结果:

Parent process 7036.
Waiting for all subprocesses done...
Run task 0 (5392)...
Run task 1 (6836)...
Run task 2 (5280)...
Run task 3 (6876)...
Task 0 runs 0.21 seconds.
Run task 4 (5392)...
Task 2 runs 0.22 seconds.
Task 4 runs 1.25 seconds.
Task 3 runs 2.71 seconds.
Task 1 runs 2.86 seconds.
All subprocesses done.


代码解读:

Pool
对象调用
join()
方法会等待所有子进程执行完毕,调用
join()
之前必须先调用
close()
,调用
close()
之后就不能继续添加新的
Process


请注意输出的结果,task
0
1
2
3
是立刻执行的,而task
4
要等待前面某个task完成后才执行,这是因为
Pool
的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是
Pool
有意设计的限制,并不是操作系统的限制。如果改成:

p = Pool(5)


就可以同时跑5个进程。

由于
Pool
的默认大小是CPU的核数,如果拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess
模块
可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令
nslookup www.python.org
,这和命令行直接运行的效果是一样的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)


运行结果:

$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223

Exit code: 0


如果子进程还需要输入,则可以通过
communicate()
方法输入:

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)


上面的代码相当于在命令行执行命令
nslookup
,然后手动输入:

set q=mx
python.org
exit


运行结果如下:(在Windows里运行不了,在Linux上运行了)

$ nslookup
Server:        10.236.158.106
Address:    10.236.158.106#53

Non-authoritative answer:
python.org    mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:

Exit code: 0


进程间通信

Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的
multiprocessing
模块
包装了底层的机制,提供了
Queue
Pipes
等多种方式来交换数据。

我们以
Queue
为例,在父进程中创建两个子进程,一个
Queue
里写数据,一个从
Queue
里读数据


from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()


在Unix/Linux下,
multiprocessing
模块封装了
fork()
调用,使我们不需要关注
fork()
的细节。由于Windows没有
fork
调用,因此,
multiprocessing
需要“模拟”出
fork
的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果
multiprocessing
在Windows下调用失败了,要先考虑是不是pickle失败了。


小结

在Unix/Linux下,可以使用
fork()
调用实现多进程。

实现跨平台的多进程,可以使用
multiprocessing
模块


进程间通信是通过
Queue
Pipes
等实现的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: