python中的多进程和多线程
作者:liuyazhuang
来源:CSDN
原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy
转载出处:https://blog.csdn.net/l1028386804/article/details/83042246
一、多进程
Python实现对进程的方式主要有两种,一种方法是使用os模块中的fork方法,另一种方法是使用multiprocessing模块。区别在于:前者仅适用于Unix/Linux操作系统,对Windows不支持,后者则是跨平台的实现方式。
1、使用os模块中的fork方式实现多进程
Python的os模块封装了常见的系统调用,其中就有fork方法。fork方法来自于Unix/Linux操作系统中提供的一个fork系统调用,这个方法很特殊。普通的方法都是调用一次,返回一次,而fork是调用一次,返回两次,原因在与操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的ID。
示例:使用fork方法创建进程,其中os模块的getpid方法用于获取当前进程的ID, getppid方法用于获取父进程ID,代码如下:
[code]# -*- coding:UTF-8 -*- import os if __name__ == "__main__": print 'current Process (%s) start ...' %(os.getpid()) pid = os.fork() if pid < 0: print 'error in fork' elif pid == 0: print 'I am child process(%s) and my parent process is (%s)', (os.getpid(), os.getppid()) else: print 'I(%s) created a child process (%s).', (os.getpid(), pid)
2、使用multiprocessing模块创建多进程
multiprocessing模块提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个Process实例的创建,用start()方法启动进程,用join()方法实现进程间的通信。
示例如下:
[code]# -*- coding:UTF-8 -*- import os from multiprocessing import Process #子进程要执行的代码 def run_proc(name): print 'child process %s (%s) Running...' %(name, os.getpid()) if __name__ == '__main__': print 'Parent process %s.' % os.getpid() for i in range(5): p = Process(target=run_proc, args=(str(i),)) print 'Process will start' p.start() p.join() print 'Process end.'
3、使用multiprocessing模块提供了一个Pool对象来代表进程池对象
Pool可以提供指定数量的进程供用户调用,默认大小是CPU核数。当有新的请求提交到Pool时,如果池还没有满,那么就创建一个新的进程用来执行该请求;但如果池中的进程已经达到规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。
示例如下:
[code]# -*- coding:UTF-8 -*- from multiprocessing import Pool import os, time, random def run_task(name): print 'Task %s (pid = %s) is running...' % (name, os.getpid()) time.sleep(random.random() * 3) print 'Task %s end' % name if __name__ == '__main__': print 'Current process %s.' % os.getpid() p = Pool(processes=3) for i in range(5): p.apply_async(run_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
执行结果如下:
[code]Current process 9648. Waiting for all subprocesses done... Task 0 (pid = 17336) is running... Task 1 (pid = 15792) is running... Task 2 (pid = 13052) is running... Task 0 end Task 3 (pid = 17336) is running... Task 1 end Task 4 (pid = 15792) is running... Task 2 end Task 3 end Task 4 end All subprocesses done.
上述程序先创建了容量为3的进程池,依次向进程池中添加了5个任务。从运行结果中可以看到虽然添加了5个任务,但是一开始只运行了3个,而且每次最多运行3个进程。当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid可以看出来。
注意:Pool对象调用join()放大会等待所有子进程执行完毕,调用join()方法之前必须先调用close()方法,调用close()方法之后就不能继续添加新的Process了。
4、进程间通信
Python提供了多种进程间通信的方式,例如:Queue、Pipe、Value+Array等。Queue和Pipe的区别在于Pipe常用来在两个进程间通信,Queue用来在多个进程间实现通信。
(1) Queue通信
Queue是安全的队列,可以使用Queue实现多进程之前的数据传递,有两个办法:Put和Get可以进行Queue操作:
Put方法用以插入数据到队列中,它还有两个可选参数:blocked和timeout。如果blocked为True(默认),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但Queue已满,会立即抛出Queue.Full异常。
Get方法可以从队列中读取并且删除一个元素,Get同样有两个可选参数:blocked和timeout。如果blocked为True(默认),并且timeout为正值,那么在等待时间内没有取到任务元素,会抛出Queue.Empty异常。如果blocked为False,分两种情况:如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。
下面的例子为:在父进程中创建三个子进程,两个子进程往Queue写数据,一个子进程从Queue中读取数据。
[code]# -*- coding:UTF-8 -*- from multiprocessing import Process, Queue import os, time, random #写数据进程执行的代码 def proc_write(queue, urls): print 'Process(%s) is writing...', os.getpid() for url in urls: queue.put(url) print 'Put %s to queue...', url time.sleep(random.random()) #读数据进程执行的代码 def proc_read(queue): print 'Process(%s) is reading...', os.getpid() while True: url = queue.get(True) print 'Get %s from queue.', url if __name__ == '__main__': #父进程创建Queue,并传递给子进程: queue = Queue() proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3'])) proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6'])) proc_reader = Process(target=proc_read, args=(queue,)) #启动子进程proc_write,写入 proc_write1.start() proc_write2.start() #启动子进程proc_reader,读取 proc_reader.start() #等待proc_write结束 proc_write1.join() proc_write2.join() #proc_reader进程里是死循环,无法等待其结束,只能强行终止: proc_reader.terminate()
(2)Pipe通信
Pipe常用来在两个进程间进行通信,两个进程分别位于管道的两端。
Piple方法返回(conn1, conn2)代表一个管道的两端。Pipe方法由duplex参数,如果duplex参数为True(默认),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。若dublex为False,conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如:在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
下面通过一个例子进行说明:创建两个进程,一个子进程通过Pipe发送数据,一个子进程通过Pipe接收数据。示例如下:
[code]# -*- coding:UTF-8 -*- from multiprocessing import Process, Queue import os, time, random #写数据进程执行的代码 def proc_write(queue, urls): print 'Process(%s) is writing...', os.getpid() for url in urls: queue.put(url) print 'Put %s to queue...', url time.sleep(random.random()) #读数据进程执行的代码 def proc_read(queue): print 'Process(%s) is reading...', os.getpid() while True: url = queue.get(True) print 'Get %s from queue.', url if __name__ == '__main__': #父进程创建Queue,并传递给子进程: queue = Queue() proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3'])) proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6'])) proc_reader = Process(target=proc_read, args=(queue,)) #启动子进程proc_write,写入 proc_write1.start() proc_write2.start() #启动子进程proc_reader,读取 proc_reader.start() #等待proc_write结束 proc_write1.join() proc_write2.join() #proc_reader进程里是死循环,无法等待其结束,只能强行终止: proc_reader.terminate()
二、多线程
Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
1、使用threading模块创建多线程
threading模块一般通过两种方式创建线程:
把函数传入并创建Thread实例,然后调用start方法开始执行;
直接从threading.Thread继承并创建线程类,然后重写__init__方法和run方法
(1)通过把函数传入创建Thread实例的方式创建多线程
代码如下:
[code]# -*- coding:UTF-8 -*- import random import time, threading #新线程执行的代码 def thread_run(urls): print 'Current %s is running...' % threading.current_thread().name for url in urls: print '%s ------->>> %s' % (threading.current_thread().name, url) time.sleep(random.random()) print '%s ended.' % threading.current_thread().name print '%s running...' % threading.current_thread().name t1 = threading.Thread(target=thread_run, name='Thread_1', args=(['url_1', 'url_2', 'url_3'],)) t2 = threading.Thread(target=thread_run, name='Thread_2', args=(['url_4', 'url_5', 'url_6'],)) t1.start() t2.start() t1.join() t2.join() print '%s ended.' % threading.current_thread().name (2)从threading.Thread继承创建线程类 代码如下: # -*- coding:UTF-8 -*- import random import threading import time class myThread(threading.Thread): def __init__(self, name, urls): threading.Thread.__init__(self, name=name) self.urls = urls def run(self): print 'Current %s running...' % threading.current_thread().name for url in self.urls: print '%s ------>>> %s' % (threading.current_thread().name, url) time.sleep(random.random()) print '%s ended.' % threading.current_thread().name print '%s is running...' % threading.current_thread().name t1 = myThread(name='Thread_1', urls=['urls_1', 'url_2', 'url_3']) t2 = myThread(name='Thread_2', urls=['urls_4', 'url_5', 'url_6']) t1.start() t2.start() t1.join() t2.join() print '%s ended.' % threading.current_thread().name
2、线程同步
使用Thread对象的Lock和RLock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。
对于Lock而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成后,别的线程才能申请该RLock对象。
代码如下:
[code]# -*- coding:UTF-8 -*- import threading mylock = threading.RLock() num = 0 class MyThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self, name=name) def run(self): global num while True: mylock.acquire() print '%s locked, Numver: %d' % (threading.current_thread().name, num) if num >= 4: mylock.release() print '%s released, Number: %d' % (threading.current_thread().name, num) break num += 1 print '%s released, Number: %d' % (threading.current_thread().name, num) mylock.release() if __name__ == '__main__': thread1 = MyThread('thread_1') thread2 = MyThread('thread_2') thread1.start() thread2.start()
3、全局解释器锁(GIL)
在Python的原始解释器CPython中存在着GIL(Global Interpreter Lock,全局解释器锁),因此在解释执行Python代码时,会产生互斥锁来限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。由于全局解释器锁的存在,在进行多线程操作的时候,不能调用多个CPU内核,只能利用一个内核,所以在进行CPU密集型操作的时候,不推荐使用多线程,更加倾向于对进程。那么多线程适合什么样的应用场景呢?对应IO密集型操作,多线程可以明显提高效率,例如:Python爬虫项目,绝大多数时间爬虫实在等待socket返回数据,网络IO的操作延时比CPU大得多。
---------------------
作者:liuyazhuang
来源:CSDN
原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy
- Python多进程、多线程、协程学习小结
- python创建多进程/多线程
- Python多线程与多进程
- Python多线程与多进程
- python多进程、多线程详解
- Python 多进程/多线程 学习笔记
- 为什么在Python里推荐使用多进程而不是多线程?
- python并行(2):python多进程与多线程
- 【Python】Python中的多线程与多进程
- python 多进程与多线程
- Python 多线程多进程10.4.1 应用于自动化测试项目
- Python多线程与多进程
- Python多线程与多进程
- Python多线程与多进程学习----概念
- python学习之路-11 多线程、多进程、协程
- python---多线程与多进程
- 【python】多进程+多线程 制作智联招聘爬虫 写入CSV+mongodb
- 【转】【Python】Python多进程与多线程
- python学习笔记(十三): 多线程多进程
- python并行处理任务时 该用多进程?还是该用多线程?