Python多进程、多线程、协程学习小结
2016-11-30 22:02
681 查看
1、Python多进程
在Unix/Linux中系统内核提供了fork系统调用来创建进程,根据不同的返回值来判断当前进程是子进程还是父进程,C语言代码示例如下:#include <unistd.h> #include <stdio.h> int main (){ //pid表示fork函数返回的值 int pid; int count=0; //创建子进程,如果创建成功,就返回两个值,一个值为0,一个值为创建的子进程的p_id(>0) //如果创建子进程失败,就返回负数 pid=fork(); if (fpid < 0) printf("进程创建失败\n"); else if (fpid == 0) { printf("i am the child process, my process id is %d/n",getpid()); printf("我是子进程\n"); count++; } else { printf("i am the parent process, my process id is %d/n",getpid()); printf("我是父进程/n"); count++; } printf("统计结果是: %d/n",count); return 0; }
Python中提供了multiprocessing跨平台的模块来实现多进程功能,示例代码如下:
from multiprocessing import Process import os # 以函数的形式定义子进程需要执行的具体的代码 def run_proc(name): # os模块的getpid()方法可以获取当前进程的进程id print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) # 创建一个进程实例 p = Process(target=run_proc, args=('test',)) # 调用strat()方法,开始执行子进程 p.start() # 调用进程的join()方法,来阻塞除当前进程以外的所有进程 # 当该进程执行完毕以后,再执行其他进程(这里指的是主进程) p.join() # 子进程执行完毕,父进程继续往下执行 print('Child process end.')
当我们需要创建多个子进程的时候,可以使用进程池的方式来管理多个子进程的stat以及join,示例代码如下:
pool = multiprocessing.Pool(processes = 3) for i in xrange(4): message = "hello world %d" % i # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool.apply_async(func, (message, )) pool.close() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 pool.join() print "all over"
需要注意的是,这里使用apply_async函数是非阻塞的,而apply函数是阻塞的。因此主进程循环执行过程中不等待apply_async的返回结果,即使子进程没有返回,整个程序也会退出。这里我们对于子进程的返回结果并不感兴趣,使用pool.close()以及pool.join()来防止主进程退出。
Unix/Linux环境下C程序进程间通信可以通过消息队列、管道、套接字、共享内存等实现。Python中的multiprocessing模块为我们提供了很好的封装,简单的几行代码就可以实现进程见的通信。
1、使用队列Queue实现进程间通信
# 向队列中写入数据 def work_1(q): try: n=1 while n<20: print("work_1,%d"%n) q.put(n) time.sleep(1) n+=1 except BaseException: print("work_1 error") finally: print("work_1 end!!!") # 取出队列中的数据 def work_2(q): try: n=1 while n<20: print("word_2,%d"%q.get()) time.sleep(1) n+=1 except BaseException: print("work_2 error") finally: print("work_2 end") if __name__ == "__main__": q= multiprocessing.Queue() p1=multiprocessing.Process(target=work_1,args=(q,)) p2=multiprocessing.Process(target=work_2,args=(q,)) p1.start() p2.start() p1.join() p2.join() print("all over")
2、使用事件Event实现进程间通信
def wait_for_event(e): print("等待event事件") e.wait() print("等待event事件:e.is_set()->"+str(e.is_set())) print("") def wait_for_event_timeout(e,t): print("等待event事件---带有超时事件") e.wait(t) print("等待event事件---带有超时事件--wait_for_event_timeout:e.is_set()->"+str(e.is_set())) print("") if __name__ == "__main__": e=multiprocessing.Event() p1=multiprocessing.Process(target=wait_for_event,args=(e,)) p2=multiprocessing.Process(target=wait_for_event_timeout,args=(e,2,)) p1.start() p2.start() time.sleep(3) e.set() print("设置event")
程序执行event.wait()方法就会进入阻塞等待状态(如果设置Flag为True,那么不会阻塞),直到另一个进程调用Event的set(),该Event会通知所有等待状态的进程恢复执行。
3、使用共享内存Value/Array实现进程间通信
def f(n,a,raw): n.value = 3.14 for i in range(5): a[i] = -a[i] raw.append(9999) print(raw) if __name__ == '__main__': num = Value('d',0.0) arr = Array('i',range(10)) raw_list = range(10) print(num.value) print(arr[:]) print(raw_list) # 调用子进程之后,重新打印array和value,值将会发生改变。 而raw_list 普通列表在外层打印则没有发生改变。 p = Process(target=f,args=(num,arr,raw_list)) p.start() p.join() print(num.value) print(arr[:]) print(raw_list)
进程间通信方式还有很多,这里就不再一一列举。
当然,除了多进程的通信之外,进程间同时也需要使用进程锁来维护一致性状态。多进程在对共享数据进行操作的时候,需要进程锁来防止数据被污染。Python中的multiprocessing模块也提供了进程锁,示例代码如下所示:
def worker_1(lock,file_name): lock.acquire() try: f=open(file_name,"a+") f.write("hahahah\n") f.close() finally: lock.release() print("work_1") def worker_2(lock,file_name): lock.acquire() try: f=open(file_name,"a+") f.write("uuuuuu\n") f.close() finally: lock.release() print("work_2") if __name__ == "__main__": lock = multiprocessing.Lock() f="./test.txt" p1=multiprocessing.Process(target=worker_1,args=(lock,f,)) p2=multiprocessing.Process(target=worker_2,args=(lock,f,))
Lock对象状态可以是locked和unlocked,调用acquire()设置状态为locked,调用release()设置状态为unlocked。
2、Python多线程
多个任务由多进程完成,也可以由一个进程中多个线程完成。Python的标准库提供了两个线程模块:thread以及threading,thread是低级模块,threading是高级模块,对thread进行了封装。一般情况下,我们只需要使用threading模块就行了,示例代码如下所示:def thread_student(): print("Hello,%s(in %s)"%(student,threading.current_thread().name)) def thread_process(name): thread_student() if __name__ == "__main__": t1 = threading.Thread(target=thread_process,args=("thread_a",)) t2 = threading.Thread(target=thread_process,args=("thread_b",)) t1.start() t2.start() t1.join() t2.join() print("all over")
多线程中,全局变量由所有线程共享,这些变量可以被任何一个线程修改,线程间也可以使用锁来维护一致性状态。调用threading.Lock()创建Lock,线程中获取/释放锁使用acquire()以及release()方法,使用起来和进程锁没有太大的区别。
对于一些复杂的环境,需要对条件进行判断,C程序中经常使用条件变量,Python的threading模块提供了Condition对象,除了具有acquire和release方法之外,还提供了wait和notify方法。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。示例代码如下:
queue = [] con = threading.Condition() class Producer(threading.Thread): def run(self): while True: if con.acquire(): if len(queue) > 100: con.wait() else: elem = random.randrange(100) queue.append(elem) print "Producer a elem {}, Now size is {}".format(elem, len(queue)) time.sleep(random.random()) con.notify() con.release() class Consumer(threading.Thread): def run(self): while True: if con.acquire(): if len(queue) < 0: con.wait() else: elem = queue.pop() print "Consumer a elem {}. Now size is {}".format(elem, len(queue)) time.sleep(random.random()) con.notify() con.release() def main(): for i in range(3): Producer().start() for i in range(2): Consumer().start()
线程间通信还包括Event、Queue等和进程间通信类似的同步方式,这里就不再一一举例。
经常听到”Python下多线程是鸡肋,推荐使用多进程“,这主要是由于Python解析器CPython中的GIL引起的。GIL全称
Global Interpreter Lock,官方给出的解释:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
在Python多线程程序中,每个线程的执行方式如下:
获取GIL
执行代码直到Sleep或是python虚拟机将其挂起
释放GIL
某个线程要想执行,必须要要先拿到GIL,而在一个Python进程中,GIL只有一个。Python2.x中,GIL释放逻辑是当前线程遇见IO操作或者ticks技术打到100。每次释放GIL,都会进行锁竞争以及线程切换。因此在多核CPU上,Python多线程并不能够充分利用硬件优势。
但也不是说Python多线程没有任何用处
- 对于CPU密集型代码(各种循环处理,计数等等),ticks很快就会达到阈值,然后触发GIL释放与再竞争(多个线程来回切换需要消耗资源),所以Python下多线程对CPU密集型代码并不友好
- 对于IO密集型代码(文件处理、网络收发请求等等),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,而多线程在线程A等待时,会自动切换到线程B,不会浪费CPU资源),所以Python下多线程对IO密集型代码比较友好
3、Python协程
Python中协程(Coroutine)就是在同一进程/线程中,利用生成器(generator)来”同时“执行多个函数(routine)。Python中任何包含yield关键字的函数都会自动成为生成器(generator)对象,里面的代码一般是一个有限或无限循环结构,每当第一次调用该函数时,会执行到yield代码为止并返回本次迭代结果,yield指令起到的是return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
对于计算100以内斐波那契额数列的例子,普通的递归方式代码如下所示:
a = b = 1 while a < 100: a, b = b, a + b print a,
使用yield以及生成器来计算斐波那契额数列,该函数形成一个无限循环的生成器,有函数调用者显示地控制迭代次数,示例代码如下所示:
def fibonacci(): a = b = 1 # yield则像是generator函数的返回结果 yield a yield b while True: a, b = b, a+b # yield唯一所做的另一件事就是保存一个generator函数的状态, # generator就是一个特殊类型的迭代器(iterator) yield b num = 0 fib = fibonacci() while num < 100: # 和迭代器相似,我们可以通过使用next()来从generator中获取下一个值,也可以通过隐式地调用next()来忽略一些值 num = next(fib) print num, # 1 1 2 3 5 8 13 21 34 55 89 144
generator以及yield最初的引入目的就是让产生值序列的代码更加简单。
利用yield自动冻结函数堆栈的特性,可以让两个函数协同执行,经典的Producer-Consumer问题使用协程方式示例代码如下:
def get_data(): """返回0到9之间的3个随机数,模拟异步操作""" return random.sample(range(10), 3) def consume(): """显示每次传入的整数列表的动态平均值""" running_sum = 0 data_items_seen = 0 while True: print('Waiting to consume') data = yield data_items_seen += len(data) running_sum += sum(data) print('Consumed, the running average is {}'.format(running_sum / float(data_items_seen))) def produce(consumer): """产生序列集合,传递给消费函数(consumer)""" while True: data = get_data() print('Produced {}'.format(data)) consumer.send(data) yield if __name__ == '__main__': consumer = consume() consumer.send(None) producer = produce(consumer) for _ in range(10): print('Producing...') next(producer)
这里send(None)相当于next(),consume虽然被调用后没有被执行,因为有yield表达式,因此使用next()让函数执行到第一个yield处。然后调用produce函数,使用next(producer)执行consumer.send(data)切换到consume函数执行,同时传递相关的data,而consume函数在yield处执行时,data就是produce传递过来的数据。
把上面的程序写得通俗易懂些,示例代码如下:
#!/usr/bin/env python import sys, time reload(sys) sys.setdefaultencoding('utf-8') """ 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。 """ # 注意到consumer函数是一个generator(生成器): # 任何包含yield关键字的函数都会自动成为生成器(generator)对象 def consumer(): r = '' while True: # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回; # yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。 # 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时, # 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 1、首先调用c.next()启动生成器 c.next() n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行; cr = c.send(n) # 4、produce拿到consumer处理的结果,继续生产下一条消息; print('[PRODUCER] Consumer return: %s' % cr) # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 c.close() if __name__=='__main__': # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 c = consumer() produce(c)
程序执行结果如下:
# [PRODUCER] →→ Producing 1... # [CONSUMER] ←← Consuming 1... # [PRODUCER] Consumer return: 200 OK # [PRODUCER] →→ Producing 2... # [CONSUMER] ←← Consuming 2... # [PRODUCER] Consumer return: 200 OK # [PRODUCER] →→ Producing 3... # [CONSUMER] ←← Consuming 3... # [PRODUCER] Consumer return: 200 OK # [PRODUCER] →→ Producing 4... # [CONSUMER] ←← Consuming 4... # [PRODUCER] Consumer return: 200 OK # [PRODUCER] →→ Producing 5... # [CONSUMER] ←← Consuming 5... # [PRODUCER] Consumer return: 200 OK
注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
首先调用c.next()启动生成器
然后,一旦生产了东西,通过c.send(n)切换到consumer执行
consumer通过yield拿到消息,处理,又通过yield把结果传回
produce拿到consumer处理的结果,继续生产下一条消息
produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,producer和consumer协作完成任务。
单线程、多线程以及多进程效率测试
这里分别定义CPU密集型函数count、IO密集型文件读写函数read和write、网络请求函数http_request示例代码如下所示:def count(x, y): # 使程序完成150万计算 c = 0 while c < 500000: c += 1 x += x y += y def write(): f = open("test.txt", "w") for x in range(500000): f.write("testwrite\n") f.close() def read(): f = open("test.txt", "r") lines = f.readlines() f.close() _head = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \ (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36' } url = "http://www.qq.com" def http_request(): try: webPage = requests.get(url, headers=_head) html = webPage.text return {"context": html} except Exception as e: return {"error": e}
单线程执行IO密集操作、CPU密集操作所需时间、网络请求密集型
示例代码如下所示:# CPU密集操作 t = time.time() for x in range(10): count(1, 1) print("Line cpu", time.time() - t) # IO密集操作 t = time.time() for x in range(10): write() read() print("Line IO", time.time() - t) # 网络请求密集型操作 t = time.time() for x in range(10): http_request() print("Line Http Request", time.time() - t)
多线程并发执行IO密集操作、CPU密集操作所需时间、网络请求密集型操作
示例代码如下所示:# CPU密集操作 counts = [] t = time.time() for x in range(10): thread = Thread(target=count, args=(1,1)) counts.append(thread) thread.start() e = counts.__len__() while True: for th in counts: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Thread CPU", time.time() - t) # IO密集操作 def io(): write() read() ios = [] t = time.time() for x in range(10): thread = Thread(target=io) ios.append(thread) thread.start() e = ios.__len__() while True: for th in ios: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Thread IO", time.time() - t) # 网络请求密集型操作 t = time.time() ios = [] for x in range(10): thread = Thread(target=http_request) ios.append(thread) thread.start() e = ios.__len__() while True: for th in ios: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Thread Http Request", time.time() - t)
多进程并行执行IO密集操作、CPU密集操作所需时间、网络请求密集型操作
示例代码如下所示# CPU密集操作 counts = [] t = time.time() for x in range(10): process = Process(target=count, args=(1,1)) counts.append(process) process.start() e = counts.__len__() while True: for th in counts: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Process CPU", time.time() - t) # IO密集操作 def io(): write() read() ios = [] t = time.time() for x in range(10): process = Process(target=io) ios.append(process) process.start() e = ios.__len__() while True: for th in ios: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Process IO", time.time() - t) # 网络请求密集型操作 httprs = [] t = time.time() for x in range(10): process = Process(target=http_request) ios.append(process) process.start() e = httprs.__len__() while True: for th in httprs: if not th.is_alive(): e -= 1 if e <= 0: break print("Multi Process Http Request", time.time() - t)
测试结果
CPU密集型操作 | IO密集型操作 | 网络请求密集型操作 | |
---|---|---|---|
单线程 | 79.34481406211853 | 1.2682409286499023 | 0.7370738983154297 |
多线程 | 61.41498112678528 | 大于120 | 33.69541811943054 |
多进程 | 17.931535005569458 | 0.4827752113342285 | 0.028677940368652344 |
多线程在IO密集型的操作下明显比单线程更差,在CPU密集型的操作下明显地比单线程线性执行性能更差,网络请求也很差。主要可能是线程间切换占据了大部分时间,也许IO操作和网络请求操作的任务再繁重一些就能体现出优势
多进程无论是在CPU密集型还是IO密集型以及网络请求密集型(经常发生线程阻塞的操作)中,都能体现出性能的优势。但是需要占用更多的CPU资源。
这里之前单测时多线程对于CPU计算密集型是肯定要比单线程差的
单线程示例代码:
import os, time def my_counter(): i = 0 for _ in range(100000000): i = i + 1 return True if __name__ == '__main__': thread_array = {} start_time = time.time() for i in range(3): my_counter() end_time = time.time() print "Total time: ", str(end_time - start_time)
执行结果:
Total time: 21.1985809803
多线程示例代码:
from threading import Thread import os, time thread_cnt = 3 def my_counter(): i = 0 for _ in range(100000000): i = i + 1 return True if __name__ == '__main__': thread_array = {} start_time = time.time() for tid in range(thread_cnt): t = Thread(target=my_counter) t.start() thread_array[tid] = t for i in range(thread_cnt): thread_array[i].join() end_time = time.time() print "Total time: ", str(end_time - start_time)
执行结果:
Total time: 40.788864851
参考
https://docs.python.org/2/library/multiprocessing.htmlhttp://cenalulu.github.io/python/gil-in-python/
http://gold.xitu.io/entry/58218787da2f60005d11f2b5
https://my.oschina.net/leejun2005/blog/501448
http://python.jobbole.com/86822/
相关文章推荐
- python学习之路-11 多线程、多进程、协程
- Python 多进程/多线程 学习笔记
- python并发编程之多进程、多线程、异步和协程详解
- 我的python学习之路-22-多线程基础以及多进程
- 深入浅析python中的多进程、多线程、协程
- python 多线程、多进程、协程性能对比(以爬虫为例)
- Python之多进程、多线程、协程和分布式进程
- python多进程、多线程、协程向mysql插入10000条数据
- Python 多线程 多进程 协程 yield
- Python语言学习讲解十三:python之多进程和多线程
- 也说性能测试,顺便说python的多进程+多线程、协程
- Python 多进程 多线程 协程 I/O多路复用
- python多进程与多线程学习总结
- python3-----多进程、多线程、多协程
- Python多线程与多进程学习----概念
- python2 多线程和多进程、协程入门讲解
- python学习笔记(十三): 多线程多进程
- python并发编程之多进程、多线程、异步和协程详解
- python并发编程之多进程、多线程、异步和协程详解
- 深入浅析python中的多进程、多线程、协程