Queue python线程间数据通讯的法宝---------stedy:非常重要
2012-04-25 22:02
337 查看
原文链接
今天学习了下Queue,本来是按照python核心编程第二版上的学的,但是那个上面介绍的及其的简单,我这样一个对进程本来就不熟悉的新手来说简直就是一个挑战,再三研究之后重要搞懂了。搞懂了就是爽啊,看起来,用起来都简单了呵呵。
Queue是标准模块所以可以直接import,用起来就像是个真正的队列,先进先出的原则,这样进程间就可以用这个数据了,丫丫的,这个可好了。还是那个经典的生产者和消费者关系开始吧,书上讲的不通俗,记得记得学MFC时候那个孙鑫就介绍的比较容易接受。好了,切入主题。
有一个生产线,一个消费线,那么我们要做的就是消费的不能比生产的要多。脚本如下:
stedy: 第一个例子的运行结果为
我们提取Queue的部分来看,首先是创建一个队列这是一个全局变量,当然是main内的’全局‘,这个变量给谁用呢?当然是给各个线程的函数用了,这里的数据就是能够共享的数据。
存入数据的函数是put简单吧,直接把数据给放入进去,取出的就是get也简单啊,不管你在函数的什么地方用都可以看到这两个函数那么我们开始担心的问题其实用这连个函数就可以搞定了,就是如果消费的数量多于生产的数量,put(value,block=0)如果我们将第二个参数设置不为0的数据,那么函数就等待到队列中有空间为止,那样线程不就挂起了吗,所以生产商就有时间生产了啊,同样如果get(block=0)函数如果参数不为0同样的等待队列有数据时候才退出。呵呵,这模样,队列这个东西很好用啊
官方文档如下:
Class Queue implements queue objects and has the methods described below. This class can be derived from in order to implement other queue organizations (e.g. stack) but the inheritable interface
is not described here. See the source code for details. The public methods are:
Return the approximate size of the queue. Because of multithreading semantics, this number is not reliable.
Return
Return
Put item into the queue. If optional args block is true andtimeout is None (the default), block if necessary until a free slot is available. Iftimeout is a positive number, it blocks at mosttimeout
seconds and raises theFull exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available,
else raise theFull exception (timeout is ignored in that case).
New in version 2.3: the timeout parameter.
Equivalent to
Remove and return an item from the queue. If optional args block is true andtimeout is None (the default), block if necessary until an item is available. Iftimeout is a positive number, it blocks at mosttimeout
seconds and raises theEmpty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
New in version 2.3: the timeout parameter.
Equivalent to
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For eachget() used to fetch a task, a subsequent call totask_done()
tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that atask_done() call was received
for every item that had beenput() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.New in version 2.5.
Blocks until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread callstask_done() to indicate that the item was retrieved and all
work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.New in version 2.5.
Example of how to wait for enqueued tasks to be completed:
今天学习了下Queue,本来是按照python核心编程第二版上的学的,但是那个上面介绍的及其的简单,我这样一个对进程本来就不熟悉的新手来说简直就是一个挑战,再三研究之后重要搞懂了。搞懂了就是爽啊,看起来,用起来都简单了呵呵。
Queue是标准模块所以可以直接import,用起来就像是个真正的队列,先进先出的原则,这样进程间就可以用这个数据了,丫丫的,这个可好了。还是那个经典的生产者和消费者关系开始吧,书上讲的不通俗,记得记得学MFC时候那个孙鑫就介绍的比较容易接受。好了,切入主题。
有一个生产线,一个消费线,那么我们要做的就是消费的不能比生产的要多。脚本如下:
#!/usr/bin/env python ''' Created on Apr 25, 2012 @author: stedy ''' #coding=cp936 from random import randint from time import sleep,ctime from Queue import Queue import threading class MyThread(threading.Thread): def __init__(self,func,args,name=""): threading.Thread.__init__(self) self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): print 'stating',self.name,'at',ctime() self.func(*self.args) def writeQ(queue): i = randint(1,100) print 'producing object for Q...',queue.put(i,1) def readQ(queue): if queue.empty() == False: val = queue.get(1) print 'value from Q',val else: print "no value,wait a moment" sleep(1) def writer(queue,loops): for i in range(loops): writeQ(queue) sleep(randint(1,3)) def reader(queue,loops): for i in range(loops): readQ(queue) sleep(randint(1,3)) funcs = [reader,writer] nfuncs = range(len(funcs)) def main(): nloops = 99 q = Queue(100) threads = [] for i in nfuncs: t = MyThread(funcs[i],(q,nloops),funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print 'all Done' if __name__ == '__main__': main()
stedy: 第一个例子的运行结果为
stating reader at Wed Apr 25 21:55:03 2012 no value,wait a moment stating writer at Wed Apr 25 21:55:03 2012 producing object for Q... None value from Q 65 producing object for Q... None value from Q 94 producing object for Q... None producing object for Q... None value from Q 80 producing object for Q... None value from Q 6 producing object for Q... None value from Q 17 producing object for Q... None producing object for Q... None value from Q 11 producing object for Q... None value from Q 33 producing object for Q... None value from Q 46 producing object for Q... None producing object for Q... None producing object for Q... None value from Q 14 value from Q 13 producing object for Q... None value from Q 47 producing object for Q... None value from Q 50 producing object for Q... None value from Q 54 producing object for Q... None producing object for Q... None value from Q 17 producing object for Q... None producing object for Q... None value from Q 44 value from Qproducing object for Q... None 26 producing object for Q... None value from Q 77 producing object for Q... None value from Q 38 producing object for Q... None value from Q 14 value from Q 29 producing object for Q... None value from Q 58 producing object for Q... None value from Q 71 producing object for Q... None value from Q 47 producing object for Q... None value from Q 70 producing object for Q... None value from Q 69 producing object for Q...value from Q 20 None producing object for Q... None value from Q 22 producing object for Q... None value from Q 96 producing object for Q... None producing object for Q...value from Q 52None value from Q 63 producing object for Q... None value from Q 79 producing object for Q... None value from Q 99 value from Q 6 producing object for Q... None value from Q 36 producing object for Q... None value from Q 64 producing object for Q... None value from Q 26 producing object for Q... None value from Q 37 producing object for Q... None value from Q 13 producing object for Q... None value from Q 41 producing object for Q... None value from Q 20 producing object for Q... None value from Q 33 value from Q 26 producing object for Q... None value from Q 21 producing object for Q... None value from Q 50 value from Q 33 producing object for Q... None value from Q 45 producing object for Q... None value from Q 31 producing object for Q... None producing object for Q... None value from Q 39 producing object for Q... None value from Q 90 producing object for Q... None producing object for Q... None value from Q 78 producing object for Q... None producing object for Q... None value from Q 84 producing object for Q... None producing object for Q... None value from Q 44 producing object for Q... None value from Q 78 value from Q 17 producing object for Q... None value from Q 7 value from Q 7 producing object for Q... None value from Q 41 value from Q 71 producing object for Q... None value from Q 57 producing object for Q... None value from Q 65 producing object for Q... None value from Q 12 value from Q 1 producing object for Q... None producing object for Q... None value from Q 97 value from Q 65 producing object for Q... None value from Q 48 producing object for Q... None producing object for Q... None value from Q 1 producing object for Q... None value from Q 58 value from Q 62 producing object for Q... None value from Q 91 producing object for Q... None value from Q 87 producing object for Q... None value from Q 92 producing object for Q... None value from Q 23 value from Q 15 producing object for Q... None producing object for Q... None value from Q 77 producing object for Q... None producing object for Q... None value from Q 3 value from Q 33 producing object for Q... None producing object for Q... None value from Q 87 producing object for Q... None value from Q 94 producing object for Q... None producing object for Q... None value from Q 89 producing object for Q... None value from Q 48 producing object for Q... None producing object for Q... None value from Q 65 producing object for Q... None value from Q 55 producing object for Q... None producing object for Q... None value from Q 28 producing object for Q... None producing object for Q... None value from Q 61 producing object for Q... None value from Q 47 producing object for Q... None value from Q 16 producing object for Q... None value from Q 77 producing object for Q... None value from Q 86 producing object for Q... None value from Q 39 value from Q 20 producing object for Q... None value from Q 63 value from Q 13 producing object for Q... None producing object for Q... None value from Q 57 producing object for Q... None value from Q 12 value from Q 9 value from Q 42 value from Q 70 value from Q 65 all Done
我们提取Queue的部分来看,首先是创建一个队列这是一个全局变量,当然是main内的’全局‘,这个变量给谁用呢?当然是给各个线程的函数用了,这里的数据就是能够共享的数据。
存入数据的函数是put简单吧,直接把数据给放入进去,取出的就是get也简单啊,不管你在函数的什么地方用都可以看到这两个函数那么我们开始担心的问题其实用这连个函数就可以搞定了,就是如果消费的数量多于生产的数量,put(value,block=0)如果我们将第二个参数设置不为0的数据,那么函数就等待到队列中有空间为止,那样线程不就挂起了吗,所以生产商就有时间生产了啊,同样如果get(block=0)函数如果参数不为0同样的等待队列有数据时候才退出。呵呵,这模样,队列这个东西很好用啊
官方文档如下:
Class Queue implements queue objects and has the methods described below. This class can be derived from in order to implement other queue organizations (e.g. stack) but the inheritable interface
is not described here. See the source code for details. The public methods are:
qsize( | ) |
empty( | ) |
Trueif the queue is empty,
Falseotherwise. Because of multithreading semantics, this is not reliable.
full( | ) |
Trueif the queue is full,
Falseotherwise. Because of multithreading semantics, this is not reliable.
put( | item[, block[, timeout]]) |
seconds and raises theFull exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available,
else raise theFull exception (timeout is ignored in that case).
New in version 2.3: the timeout parameter.
put_nowait( | item) |
put(item, False).
get( | [block[, timeout]]) |
seconds and raises theEmpty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
New in version 2.3: the timeout parameter.
get_nowait( | ) |
get(False).
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
task_done( | ) |
tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that atask_done() call was received
for every item that had beenput() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.New in version 2.5.
join( | ) |
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread callstask_done() to indicate that the item was retrieved and all
work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.New in version 2.5.
Example of how to wait for enqueued tasks to be completed:
def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.setDaemon(True) t.start() for item in source(): q.put(item) q.join() # block until all tasks are done
相关文章推荐
- 使用 Python 进行线程编程------ stedy:非常重要
- 分享非常有用的Java程序(关键代码)(八)---Java InputStream读取网络响应Response数据的方法!(重要)
- python基础-进线程下的queue、及其生产者消费者模型(2种方式)
- python队列、线程间通信Queue,多进程模块multiprocessing
- 基本数据类型的使用【非常重要】
- delphi数据类型 非常重要!!!
- Python中的线程使用 和 queue
- 线程共享的环境包括:进程代码段、进程的公有数据(利用这些共享的数据,线程很容易的实现相互之间的通讯)、进程打开的文件描述符、信号的处理器、进程的当前目录和进程用户ID与进程组ID。 进程拥有这
- 字符串处理是许多程序中非常重要的一部分,它们可以用于文本显示,数据表示,查找键和很多目的.在Unix下,用户可以使用正则表达式的强健功能实现这些 目的,从Java1.4起,Java核心API就引入了java.util.regex程序包,它是一种有价值的基础
- 非常重要的尝试--使用queue的多线程
- Python线程优先级队列(Queue)
- 主线程和子线程的通讯之主线程用bundle发送数据到子线程
- PYTHON线程知识再研习F---队列同步Queue
- 这篇文章主要介绍了python中enumerate的用法,对Python初学者而言是非常重要的概念
- 数据恢复/电子取证 非常有用的python库——Construct
- python中的进程、线程(threading、multiprocessing、Queue、subprocess)
- Hibernate懒加载导致json数据对象传输异常的问题---(非常重要)
- IT++数学、信号、通讯类库,Blitz++数学,Armadillo 线性代数,Dlib网络,线程,图形,数学,图像,数据挖掘/机器学习,XML等等
- python使用threading.Thread和Queue通过urllib2.urlopen抓取数据
- Python并行编程(九):线程通讯queue