您的位置:首页 > 编程语言 > Python开发

python中的多进程和多线程

2018-10-16 17:16 316 查看

作者:liuyazhuang 
来源:CSDN 
原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy 
转载出处:https://blog.csdn.net/l1028386804/article/details/83042246

一、多进程
Python实现对进程的方式主要有两种,一种方法是使用os模块中的fork方法,另一种方法是使用multiprocessing模块。区别在于:前者仅适用于Unix/Linux操作系统,对Windows不支持,后者则是跨平台的实现方式。

1、使用os模块中的fork方式实现多进程
Python的os模块封装了常见的系统调用,其中就有fork方法。fork方法来自于Unix/Linux操作系统中提供的一个fork系统调用,这个方法很特殊。普通的方法都是调用一次,返回一次,而fork是调用一次,返回两次,原因在与操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的ID。

示例:使用fork方法创建进程,其中os模块的getpid方法用于获取当前进程的ID, getppid方法用于获取父进程ID,代码如下:

[code]# -*- coding:UTF-8 -*-
import os
 
if __name__ == "__main__":
    print 'current Process (%s) start ...' %(os.getpid())
    pid = os.fork()
    if pid < 0:
        print 'error in fork'
    elif pid == 0:
        print 'I am child process(%s) and my parent process is (%s)', (os.getpid(), os.getppid())
    else:
        print 'I(%s) created a child process (%s).', (os.getpid(), pid)


2、使用multiprocessing模块创建多进程
multiprocessing模块提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个Process实例的创建,用start()方法启动进程,用join()方法实现进程间的通信。
示例如下:

[code]# -*- coding:UTF-8 -*-
import os
from multiprocessing import Process
 
#子进程要执行的代码
def run_proc(name):
    print 'child process %s (%s) Running...' %(name, os.getpid())
 
if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    for i in  range(5):
        p = Process(target=run_proc, args=(str(i),))
        print 'Process will start'
        p.start()
    p.join()
    print 'Process end.'


3、使用multiprocessing模块提供了一个Pool对象来代表进程池对象
Pool可以提供指定数量的进程供用户调用,默认大小是CPU核数。当有新的请求提交到Pool时,如果池还没有满,那么就创建一个新的进程用来执行该请求;但如果池中的进程已经达到规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。
示例如下:

[code]# -*- coding:UTF-8 -*-
 
from multiprocessing import Pool
import os, time, random
 
def run_task(name):
    print 'Task %s (pid = %s) is running...' % (name, os.getpid())
    time.sleep(random.random() * 3)
    print 'Task %s end' % name
 
if __name__ == '__main__':
    print 'Current process %s.' % os.getpid()
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'


执行结果如下:

[code]Current process 9648.
Waiting for all subprocesses done...
Task 0 (pid = 17336) is running...
Task 1 (pid = 15792) is running...
Task 2 (pid = 13052) is running...
Task 0 end
Task 3 (pid = 17336) is running...
Task 1 end
Task 4 (pid = 15792) is running...
Task 2 end
Task 3 end
Task 4 end
All subprocesses done.


上述程序先创建了容量为3的进程池,依次向进程池中添加了5个任务。从运行结果中可以看到虽然添加了5个任务,但是一开始只运行了3个,而且每次最多运行3个进程。当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid可以看出来。
注意:Pool对象调用join()放大会等待所有子进程执行完毕,调用join()方法之前必须先调用close()方法,调用close()方法之后就不能继续添加新的Process了。

4、进程间通信
Python提供了多种进程间通信的方式,例如:Queue、Pipe、Value+Array等。Queue和Pipe的区别在于Pipe常用来在两个进程间通信,Queue用来在多个进程间实现通信。

(1) Queue通信
Queue是安全的队列,可以使用Queue实现多进程之前的数据传递,有两个办法:Put和Get可以进行Queue操作:

Put方法用以插入数据到队列中,它还有两个可选参数:blocked和timeout。如果blocked为True(默认),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但Queue已满,会立即抛出Queue.Full异常。
Get方法可以从队列中读取并且删除一个元素,Get同样有两个可选参数:blocked和timeout。如果blocked为True(默认),并且timeout为正值,那么在等待时间内没有取到任务元素,会抛出Queue.Empty异常。如果blocked为False,分两种情况:如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。
下面的例子为:在父进程中创建三个子进程,两个子进程往Queue写数据,一个子进程从Queue中读取数据。

[code]# -*- coding:UTF-8 -*-
 
from multiprocessing import Process, Queue
import os, time, random
 
#写数据进程执行的代码
def proc_write(queue, urls):
    print 'Process(%s) is writing...', os.getpid()
    for url in urls:
        queue.put(url)
        print 'Put %s to queue...',  url
        time.sleep(random.random())
 
#读数据进程执行的代码
def proc_read(queue):
    print 'Process(%s) is reading...', os.getpid()
    while True:
        url = queue.get(True)
        print 'Get %s from queue.', url
 
if __name__ == '__main__':
    #父进程创建Queue,并传递给子进程:
    queue = Queue()
    proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3']))
    proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6']))
    proc_reader = Process(target=proc_read, args=(queue,))
    #启动子进程proc_write,写入
    proc_write1.start()
    proc_write2.start()
    #启动子进程proc_reader,读取
    proc_reader.start()
    #等待proc_write结束
    proc_write1.join()
    proc_write2.join()
    #proc_reader进程里是死循环,无法等待其结束,只能强行终止:
    proc_reader.terminate()


(2)Pipe通信
Pipe常用来在两个进程间进行通信,两个进程分别位于管道的两端。
Piple方法返回(conn1, conn2)代表一个管道的两端。Pipe方法由duplex参数,如果duplex参数为True(默认),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。若dublex为False,conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如:在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
下面通过一个例子进行说明:创建两个进程,一个子进程通过Pipe发送数据,一个子进程通过Pipe接收数据。示例如下:

[code]# -*- coding:UTF-8 -*-
 
from multiprocessing import Process, Queue
import os, time, random
 
#写数据进程执行的代码
def proc_write(queue, urls):
    print 'Process(%s) is writing...', os.getpid()
    for url in urls:
        queue.put(url)
        print 'Put %s to queue...',  url
        time.sleep(random.random())
 
#读数据进程执行的代码
def proc_read(queue):
    print 'Process(%s) is reading...', os.getpid()
    while True:
        url = queue.get(True)
        print 'Get %s from queue.', url
 
if __name__ == '__main__':
    #父进程创建Queue,并传递给子进程:
    queue = Queue()
    proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3']))
    proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6']))
    proc_reader = Process(target=proc_read, args=(queue,))
    #启动子进程proc_write,写入
    proc_write1.start()
    proc_write2.start()
    #启动子进程proc_reader,读取
    proc_reader.start()
    #等待proc_write结束
    proc_write1.join()
    proc_write2.join()
    #proc_reader进程里是死循环,无法等待其结束,只能强行终止:
    proc_reader.terminate()


二、多线程
Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

1、使用threading模块创建多线程
threading模块一般通过两种方式创建线程:

把函数传入并创建Thread实例,然后调用start方法开始执行;
直接从threading.Thread继承并创建线程类,然后重写__init__方法和run方法
(1)通过把函数传入创建Thread实例的方式创建多线程
代码如下:

[code]# -*- coding:UTF-8 -*-
 
import random
import time, threading
 
#新线程执行的代码
def thread_run(urls):
    print 'Current %s is running...' % threading.current_thread().name
    for url in urls:
        print '%s ------->>> %s' % (threading.current_thread().name, url)
        time.sleep(random.random())
    print '%s ended.' % threading.current_thread().name
 
print '%s running...' % threading.current_thread().name
 
t1 = threading.Thread(target=thread_run, name='Thread_1', args=(['url_1', 'url_2', 'url_3'],))
t2 = threading.Thread(target=thread_run, name='Thread_2', args=(['url_4', 'url_5', 'url_6'],))
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name
(2)从threading.Thread继承创建线程类
代码如下:

# -*- coding:UTF-8 -*-
 
import random
import threading
import time
 
class myThread(threading.Thread):
    def __init__(self, name, urls):
        threading.Thread.__init__(self, name=name)
        self.urls = urls
 
    def run(self):
        print 'Current %s running...' % threading.current_thread().name
        for url in self.urls:
            print '%s ------>>> %s' % (threading.current_thread().name, url)
            time.sleep(random.random())
        print '%s ended.' % threading.current_thread().name
 
print '%s is running...' % threading.current_thread().name
t1 = myThread(name='Thread_1', urls=['urls_1', 'url_2', 'url_3'])
t2 = myThread(name='Thread_2', urls=['urls_4', 'url_5', 'url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name


2、线程同步
使用Thread对象的Lock和RLock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。
对于Lock而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成后,别的线程才能申请该RLock对象。
代码如下:

[code]# -*- coding:UTF-8 -*-
import threading
 
mylock = threading.RLock()
num = 0
class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self, name=name)
 
    def run(self):
        global num
        while True:
            mylock.acquire()
            print '%s locked, Numver: %d' % (threading.current_thread().name, num)
            if num >= 4:
                mylock.release()
                print '%s released, Number: %d' % (threading.current_thread().name, num)
                break
            num += 1
            print '%s released, Number: %d' % (threading.current_thread().name, num)
            mylock.release()
 
if __name__ == '__main__':
    thread1 = MyThread('thread_1')
    thread2 = MyThread('thread_2')
    thread1.start()
    thread2.start()


3、全局解释器锁(GIL)
在Python的原始解释器CPython中存在着GIL(Global Interpreter Lock,全局解释器锁),因此在解释执行Python代码时,会产生互斥锁来限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。由于全局解释器锁的存在,在进行多线程操作的时候,不能调用多个CPU内核,只能利用一个内核,所以在进行CPU密集型操作的时候,不推荐使用多线程,更加倾向于对进程。那么多线程适合什么样的应用场景呢?对应IO密集型操作,多线程可以明显提高效率,例如:Python爬虫项目,绝大多数时间爬虫实在等待socket返回数据,网络IO的操作延时比CPU大得多。
--------------------- 
作者:liuyazhuang 
来源:CSDN 
原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy 
 

1b024 阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: