您的位置:首页 > 编程语言 > Python开发

python中线程的使用以及生产者和消费者的实现

2014-07-22 13:22 721 查看
Python用GIL( global Interpretor Lock)和队列模型来处理资源的抢占问题,Python解释器不是线程安全的,需要持有这个锁,才可以安全访问python对象,因此,python不能很好的利用多CPU资源。

具体可以参考:
http://python.jobbole.com/81822/  Python的GIL是什么鬼

上一篇文章中讲了进程了,那么为什么还需要多线程呢,由于进程开销大,通信麻烦,所以需要多线程,多线程是在单独的进程中并发的执行任务。

线程状态:就绪 运行 休眠 中止

thread模块

使用start_new_thread(func,(args1,args2...))

start_new_thread此方法接收一个函数作为参数,并启动一个线程。 已经不推荐使用thread模块了。

threading模块

使用方法,继承threading.Thread 重写run

import threading,time
class ThreadDemo(threading.Thread):
def __init__(self,index,create_time):
threading.Thread.__init__(self)
self.index = index
self.create_time = create_time
def run(self):
time.sleep(1)
print (time.time()-self.create_time),"\t",self.index
print "Thread %d exit" % (self.index)

for x in range(5):
t = ThreadDemo(x,time.time())
t.start()


常用方法:

start run join setDaemon isDaemon 注意start 和run 的区别

join注意两点:

若不设置timeout,则join会一直阻塞至线程终止。
线程不能在自己的运行代码中join,否则会死锁

threading.local()

不同线程中保存为不同的值。同java中的threadlocal
import threading,time,random
class ThreadLocal(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.local = threading.local()
def run(self):
time.sleep(random.random())
self.local.numbers = []
for i in range(10):
self.local.numbers.append(random.choice(range(10)))
print threading.currentThread,self.local.numbers

for x in range(5):
t = ThreadLocal()
t.start()

线程同步

临界资源

临界资源,其实就是被多线程都会访问到的那段代码。关于临界资源的访问,多线程访问临界资源会导致非预期结果。例如,一个计数器的increase方法,调一次+1,若100个线程去调用,最后结果通常小于100,这事因为同时访问和修改导致的。
import threading
class Counter:
def __init__(self):
self.value = 0
def increase(self):
self.value += 1
return self.value
counter = Counter()
class ThreadDemo(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print counter.increase()

for x in range(100):
t = ThreadDemo()
t.start()
结果基本都小于100,需要使用一些措施来避免出现这种问题。

thread锁机制

使用thread.allocate_lock()来分配锁,然后acquire获取锁,执行完成后release释放,代码如下
import threading,thread
class Counter:
def __init__(self):
self.value = 0
self.lock = thread.allocate_lock()
def increase(self):
self.lock.acquire()
self.value += 1
self.lock.release()
return self.value
counter = Counter()
class ThreadDemo(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print counter.increase()

for x in range(100):
t = ThreadDemo()
t.start()


条件变量

锁机制可以解决同步问题,但是遇到复杂的多线程问题就无能为力了。比如常见的生产者-消费者问题,线程间需要互相感知,这里就要用到条件变量。
当然,使用条件变量同样也实现了锁的功能,提供acquire 和release方法,还提供notify notifyall wait
生产者与消费者的例子
from threading import Thread,Condition,currentThread
import time,random

class Producer(Thread):
    def __init__(self,condition,goods):
        Thread.__init__(self)
        self.cond=condition
        self.goods=goods
       
    def run(self):
        while 1:
            self.cond.acquire()
            self.goods.append(chr(random.randint(97, 122)))
            print self.goods
            cond.notifyAll()
            cond.release()
            time.sleep(random.choice(range(3)))
            
class Consumer(Thread):
    def __init__(self,condition,goods):
        Thread.__init__(self)
        self.cond=condition
        self.goods=goods

    def run(self):
        while 1:
            cond.acquire()
            while not goods:
                print "has no good,wait..."
                cond.wait()
            print "consume:",goods.pop(0)
            print self.goods
            cond.release()
            time.sleep(random.choice(range(2)))

goods=[]
cond=Condition()
p = Producer(cond,goods)
c = Consumer(cond,goods)
p.start()
c.start()
这里假定池子没有上限,生产者可以一直生产,消费者必须池子里有东西才能消费,为空时候等待wait. 这里与JAVA思路相同,区别就是不需要自己写一个同步的池子,反正锁都是全局锁。

同步队列

Queue.Queue(maxsize=10) 
当设置size小于1  则size无限制。
方法为put()
get()
有block参数,默认1,当为空时候取,或者满的时候存,则会等待。若为0则会报错

# queue_example.py
from Queue import Queue
import threading
import random
import time

# Producer thread
class Producer(threading.Thread):
    def __init__(self, threadname, queue):
        threading.Thread.__init__(self, name = threadname)
        self.sharedata = queue
    def run(self):
        for i in range(20):
            print self.getName(),'adding',i,'to queue'
            self.sharedata.put(i)
            time.sleep(random.randrange(2))
        print self.getName(),'Finished'

# Consumer thread
class Consumer(threading.Thread):
    def __init__(self, threadname, queue):
        threading.Thread.__init__(self, name = threadname)
        self.sharedata = queue
    def run(self):
        for i in range(20):
            print self.getName(),'got a value:',self.sharedata.get()
            time.sleep(random.randrange(4))
        print self.getName(),'Finished'

# Main thread
def main():
    queue = Queue(2)
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)

    print 'Starting threads ...'
    producer.start()
    consumer.start()

    producer.join()
    consumer.join()

    print 'All threads have terminated.'

if __name__ == '__main__':
    main()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: