您的位置:首页 > 编程语言 > Python开发

python实现生产者消费者模式代码示例

2017-08-13 21:51 886 查看
以下代码隐去部分实现细节,仅为结构示例:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

# author:liu
# time:2017.8.12

import MySQLdb,threading,os
import Queue
import subprocess
import re
import time

lock = threading.Lock()

# 子线程
class worker(threading.Thread):
def __init__(self, queue, queue_collector):
threading.Thread.__init__(self)
self.queue = queue
self.queue_collector = queue_collector
self.thread_stop = False

def run(self):
while not self.thread_stop:
print("thread%d %s: waiting for tast" %(self.ident,self.name))
try:
lock.acquire()
task = q.get(block = True, timeout = 20)
q.task_done()#完成一个任务
lock.release()
print task
ip = task[0][1]
except Queue.Empty:
print("Nothing to do!i will go home!")
self.thread_stop=True
break
print("i am working")
try:
#此处线程执行任务代码
time.sleep(3)
print("work finished!")
except:
print 'error'
lock.acquire()
q.put([task], block = True, timeout = None)
lock.release()
cnt = q.qsize()#判断消息队列大小
if cnt > 0:
print("fuck!There are still %d tasks to do" % (cnt))
else:
break
# print 'ccccc------------------!'

def stop(self):
self.thread_stop = True

if __name__ == '__main__':
# connect mysql database
conn = MySQLdb.connect(host = 'localhost', user = 'xxx', passwd = 'xxx', db = 'xxx', charset = 'utf8')
cursor = conn.cursor()
sql = 'xxxxxxxxxxxxxxxxx'
try:
cursor.execute(sql)
data = cursor.fetchall() #tuple
except:
print "Error: unable to fecth data"
# 创建子线程
q = Queue.Queue(len(data))
q_res = Queue.Queue(len(data))
for task in data:
q.put([task], block = True, timeout = None)
# print 'aaaaaaaaaaaaaaaaaaaaaaaaaa'
worker1 = worker(q, q_res)
worker2 = worker(q, q_res)
worker1.start()
worker2.start()
worker1.join()
worker2.join()
file = open('result.txt', 'w')
# print q_res.qsize()
while not q_res.empty():
tmp = q_res.get()
print tmp
file.write(tmp + '\n')
file.close()

# close mysql connect
conn.close()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: