您的位置:首页 > 编程语言 > Python开发

[Python标准库]Queue——线程安全的 FIFO 实现

2015-12-25 00:00 591 查看
[Python标准库]Queue——线程安全的 FIFO 实现

作用:提供一个线程安全的 FIFO 实现。

Python 版本:至少 1.4

Queue 模块提供一个适用于多线程编程的先进先出(first-in,first-out,FIFO)数据结构,可以用来在生产者消费者线程之间安全地传递消息或其他数据。它会为调用者处理锁定,使用多个线程可以安全地处理同一个 Queue 实例。Queue 的大小(其中包含的元素个数)可能要受限,以限制内存使用或处理。

基本 FIFO 队列

Queue 类实现了一个基本的先进先出容器。使用 put() 将元素增加到序列一段,使用 get() 从另一端删除。
import Queue

q = Queue.Queue()

for i in range(5):
q.put(i)

while not q.empty():
print q.get(),
print

这个例子使用了一个线程,来展示按元素的插入顺序从队列删除元素。

LIFO 队列

与 Queue 的标准 FIFO 实现相反,LifoQueue 使用了后进先出(last-in,first-out,LIFO)顺序(通常与栈数据结构关联)。
import Queue

q = Queue.LifoQueue()

for i in range(5):
q.put(i)

while not q.empty():
print q.get(),
print

get 将删除最近使用 put 插入到队列的元素。

优先队列

有些情况下,队列中元素的处理顺序需要根据这些元素的特性来决定,而不只是在队列中创建或插入的顺序。例如,财务部门的打印作业可能要优先于一个开发人员打印的代码清单。PriorityQueue 使用队列内容的有序顺序来决定获取哪一个元素。
import Queue
import threading

class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'New job:', description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)

q = Queue.PriorityQueue()

q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )

def process_job(q):
while True:
next_job = q.get()
print 'Processing job:', next_job.description
q.task_done()

workers = [ threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]

for w in workers:
w.setDaemon(True)
w.start()

q.join()

这个例子有多个线程在处理作业,要根据调用 get() 时队列中元素的优先级来处理。运行消费者线程时,增加到队列中的元素的处理顺序取决于线程上下文切换。

构建一个多线程博客客户程序

接下来的例子将构建一个博客客户程序,程序的源代码展示了如何用多个线程使用 Queue 类。这个程序要读入一个或多个 RSS 提要,对专辑排队来显示最新的 5 集以供下载,并使用线程并行地处理多个下载。这里没有提供足够的错误处理,所有不能在实际生产环境中使用,不过这个骨架实现可以作为一个很好的例子来说明如果使用 Queue 模块。

首先要建立一些操作参数。正常情况下,这些参数来自用户输入(首选项、数据库,等等)。不过在这个例子中,线程数和要获取的 URL 列表都使用了硬编码值。
from Queue import Queue
from threading import Thread
import time
import urllib
import urlparse
import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [ 'http://advocacy.python.org/podcasts/littlebit.rss',
]

函数 downloadEnclosures() 在工作线程中运行,使用 urllib 来处理下载。
def downloadEnclosures(i, q):
"""This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and only exit when
the main thread ends.
"""

while True:
print '%s: Looking for the next enclosure' % i
url = q.get()
parsed_url = urlparse.urlparse(url)
print '%s: Downloading:' % i, parsed_url.path
response = urllib.urlopen(url)
data = response.read()
# Save the downloaded file to the current directory
outfile_name = url.rpartition('/')[-1]
with open(outfile_name, 'wb') as outfile:
outfile.write(data)
q.task_done()

一旦定义了线程的目标函数,接下来可以启动工作线程。downloadEnclosures() 处理语句 url = q.get() 时,会阻塞并等待,直到队列返回某个结果。这说明,即使队列中没有任何内容,也可以安全地启动线程。
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
worker = Thread(target=downloadEnclosures,
args=(i, enclosure_queue,))
worker.setDaemon(True)
worker.start()

下一步使用 Mark Pilgrim 的 feedparser 模块(www.feedparser.org)获取提要内容,并将这些专辑的 URL 入队。一旦第一个 URL 增加到队列,就会有某个工作线程提取出这个 URL,开始下载。这个循环会继续增加元素,直到提要全部利用,工作线程会以此将 URL 出队来下载这些提要。
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][-5:]:
print entry.get('enclosure', [])
for enclosure in entry.get('enclosure', []):
parsed_url = urlparse.urlparse(enclosure['url'])
print 'Queuing:', parsed_url.path
enclosure_queue.put(enclosure['url'])

还有一件事要做,要使用 join() 再次等待队列腾空。
# Now wait for the queue to be empty, indicating that we have
# processed all the downloads.
print '*** Main thread waiting'
enclosure_queue.join()
print '*** Done'

具体的输出取决于所使用的 RSS 提要的内容。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: