您的位置:首页 > 编程语言 > Python开发

Python多任务(二)-----5种线程同步技术 Lock、RLock、Condition、Queue、Event详解

2019-02-15 11:32 323 查看

Python中线程同步技术

  • Python中的 threading 模块提供了多种用于线程同步的对象:

Lock 对象:

  • 互斥锁 Lock 对象 是比较低级的同步原语,当被锁定以后不属于特定的线程。
  • 一个锁有两种状态:locked 和 unlocked。 acquire()方法:上锁操作 如果锁处于 unlocked 状态,acquire()方法将其修改为 locked 状态并立即返回。
  • 如果锁处于 locked 状态,则堵塞当前线程并等待其他线程释放解锁,然后将其修改为 locked 状态并立即返回。
  • release()方法:解锁操作
      release() 方法用来将锁的状态由 locked 修改为 unlocked 状态并立即返回。
    • 如果锁的状态本身是 unlocked 状态,调用该方法会抛出异常
    # 导入threading模块
    import threading
    
    # 创建互斥锁
    mutex = threading.Lock()
    
    # 锁定操作(调用acquire方法)
    mutex.acquire()
    
    # 释放解锁(调用release方法)
    mutex.release()
  • 使用示例:
    import time
    import threading
    
    def add1():
    global g_num
    for i in range(1000000):
    # 上锁操作
    # 如果之前没有被上锁,那么此时上锁成功
    # 如果之前已被上锁,那么此时发生堵塞
    mutex.acquire()
    g_num +=1
    # 解锁操作
    mutex.release()
    print("---add1---%s", str(g_num))
    
    def add2(num):
    global g_num
    for i in range(num):
    mutex.acquire()
    g_num +=1
    mutex.release()
    print("---add2---%s", str(g_num))
    
    g_num = 0
    # 创建锁(即创建Lock类的对象)
    mutex = threading.Lock()
    
    def main():
    t1 = threading.Thread(target=add1)
    # 注意 使用了 args 参数
    t2 = threading.Thread(target=add2, args=(1000000,))
    t1.start()
    t2.start()
    time.sleep(2)
    print("---main---%s", str(g_num))
    
    if __name__ == "__main__":
    main()
  • 锁的使用,有两种方法:
      一种是通过Lock的acquire()和release()函数来控制加锁和解锁。
    • 另一种是用with操作来自动获取和释放锁。
    with mutex:
    #with表示自动打开自动释放锁
    for i in range(1000000): #锁定期间,其他人不可以干活
    num+=1
    
    #上面的和下面的是等价的
    
    if mutex.acquire(1):
    #锁住成功继续干活,没有锁住成功就一直等待,1代表独占
    for i in range(1000000): #锁定期间,其他线程不可以干活
    num+=1
    mutex.release() #释放锁

    RLock 对象:

    • 可重入锁(递归锁) RLock 对象也是一种常用的线程同步原语,可被一个线程acquire()多次,即上锁多次,且不会堵塞避免发生死锁现象。**
    • 注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
    • 与 Lock互斥锁的区别: 当处于 locked 状态时,某线程拥有该锁
    • 当处于 unlocked 状态时,该锁不属于任何线程
    • RLock 对象的acquire()/release()调用对可以嵌套,仅当最后一个或者最外层的 release()执行结束后,锁才会被设置为 unlocked 状态。
    import threading
    lock = threading.Lock() #Lock对象
    lock.acquire()
    lock.acquire()  #产生了死琐。
    lock.release()
    lock.release()
    
    # Lock 和 RLock 两者之间的区别
    
    import threading
    rLock = threading.RLock()  #RLock对象
    rLock.acquire()
    rLock.acquire() #在同一线程内,程序不会堵塞。
    rLock.release()
    rLock.release()

    Condition 对象:

    • 使用Condition 对象可以在某些事件触发后才处理数据,可以用于不同线程之间的通信或通知,以实现更高级别的同步。
    • Condition 对象处理具有 acquire()方法 和 release()方法 之外还有 wait()方法、notify方法 和 notify_all()等方法: wait(timeout=None)方法: 线程挂起,等待被唤醒(其他线程的notify方法)或者发生超时。调用该方法的线程必须先获得锁,否则引发RuntimeError异常。
    • 该方法会释放底层锁,然后阻塞,直到它被另一个线程中的相同条件变量的notify()或notify_all()方法唤醒,或者发生超时。一旦被唤醒或超时,它会重新获取锁并返回,返回值为True。
    • 如果给定timeout并发生超时,则返回False
  • notify(n=1)方法:
      默认情况下,唤醒等待此条件变量的一个线程(如果有)。调用该方法的线程必须先获得锁,否则引发RuntimeError。
    • 该方法最多唤醒n个等待中的线程,如果没有线程在等待,它就是要给无动作的操作
    • 注意:要被唤醒的线程实际上不会马上从wait()方法返回(唤醒),而是等到它重新获取锁。这是因为notify()并不会释放锁,需要线程本身来释放(通过wait()或者release())
  • notify_all()方法:
      此方法类似于notify(),但唤醒的是所有等待的线程
  • 使用示例:
    import threading
    from threading import Condition
    # condition
    
    class XiaoMing(threading.Thread):
    def __init__(self, cond):
    self.cond = cond
    super().__init__(name="xiaoming")
    
    def run(self):
    # 上锁
    self.cond.acquire()
    # 线程挂起,等待被唤醒
    # 等待-释放锁;被唤醒-获取锁
    self.cond.wait()
    print('{}:ennn. '.format(self.name))
    # 唤醒等待此条件变量的一个线程
    self.cond.notify()
    # 线程挂起,等待被唤醒
    # 等待-释放锁;被唤醒-获取锁
    self.cond.wait()
    
    print('{}:好嗒. '.forma
    4000
    t(self.name))
    # 释放解锁
    self.cond.release()
    
    class Teacher(threading.Thread):
    def __init__(self, cond):
    super().__init__(name="teacher")
    self.cond = cond
    
    def run(self):
    # 上锁
    self.cond.acquire()
    print('{}:hello ~ xiaoming. '.format(self.name))
    # 唤醒等待此条件变量的一个线程
    self.cond.notify()
    # 线程挂起,等待被唤醒
    # 等待-释放锁;被唤醒-获取锁
    self.cond.wait()
    
    print('{}:我们来念一首诗吧! . '.format(self.name))
    # 唤醒等待此条件变量的一个线程
    self.cond.notify()
    # 释放解锁
    self.cond.release()
    
    if __name__ == '__main__':
    condition = Condition()
    xiaoming = XiaoMing(condition)
    teacher = Teacher(condition)
    # 启动顺序很重要
    xiaoming.start()
    teacher.start()
      打印结果:
    teacher:hello ~ xiaoming.
    xiaoming:ennn.
    teacher:我们来念一首诗吧! .
    xiaoming:好嗒
  • 生产者与消费者 – Condition版
  • import threading
    import time
    
    num = 0
    
    class Producer(threading.Thread):
    def __init__(self, cont):
    self.cont = cont
    super().__init__(name="producer")
    
    def run(self):
    global num
    self.cont.acquire()
    while True:
    num += 1
    print("生产了一个,已经生产了%s个" % str(num))
    if num >= 5:
    print("生产已经达到五个不在生产。")
    # 唤醒消费者
    self.cont.notify()
    # 线程挂起,等待被唤醒
    self.cont.wait()
    # 释放解锁
    self.cont.release()
    
    class Customer(threading.Thread):
    def __init__(self, cont, name):
    self.cont = cont
    self.money = 3
    super().__init__(name=name)
    
    def run(self):
    global num
    while self.money > 0:
    self.cont.acquire()
    if num <= 0:
    print("没货了,%s通知生产者" % str(threading.current_thread().name))
    self.cont.notify()
    self.cont.wait()
    self.money -= 1
    num -= 1
    print(str(threading.current_thread().name) + "消费了一个,产品剩余" +
    str(num) + "个,剩余" + str(self.money) + "块钱")
    self.cont.release()
    time.sleep(1)
    print("%s没钱了" % str(threading.current_thread().name))
    
    def main():
    cont = threading.Condition()
    p = Producer(cont)
    c1 = Customer(cont, "customer1")
    c2 = Customer(cont, "customer2")
    p.start()
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    
    if __name__ == "__main__":
    main()
    • 运行结果:
    生产了一个,已经生产了1个
    生产了一个,已经生产了2个
    生产了一个,已经生产了3个
    生产了一个,已经生产了4个
    生产了一个,已经生产了5个
    生产已经达到五个不在生产。
    customer1消费了一个,产品剩余4个,剩余2块钱
    customer2消费了一个,产品剩余3个,剩余2块钱
    customer1消费了一个,产品剩余2个,剩余1块钱
    customer2消费了一个,产品剩余1个,剩余1块钱
    customer1消费了一个,产品剩余0个,剩余0块钱
    没货了,customer2通知生产者
    生产了一个,已经生产了1个
    生产了一个,已经生产了2个
    生产了一个,已经生产了3个
    生产了一个,已经生产了4个
    生产了一个,已经生产了5个
    生产已经达到五个不在生产。
    customer2消费了一个,产品剩余4个,剩余0块钱
    customer1没钱了
    customer2没钱了

    queue 对象:

    Event 对象:

    • **Event 对象一种简单的线程通信技术,一个线程设置Event对象,另一个线程等待Event对象。
    • 事件对象管理一个内部标志,通过set()方法将其设置为True,并使用clear()方法将其设置为False。wait()方法阻塞,直到标志为True。该标志初始为False
    • Event 对象中的方法如下: isSet()方法: 当且仅当内部标志为True时返回True
  • set()方法:
      将内部标志设置为True。所有等待它成为True的线程都被唤醒。当标志保持在True的状态时,线程调用wait()是不会阻塞的。
  • clear()方法:
      将内部标志重置为False。随后,调用wait()的线程将阻塞,直到另一个线程调用set()将内部标志重新设置为True。
  • wait(timeout=None)方法:
      阻塞直到内部标志为真。如果内部标志在wait()方法调用时为True,则立即返回。否则,则阻塞,直到另一个线程调用set()将标志设置为True,或发生超时。
    • 该方法总是返回True,除非设置了timeout并发生超时。
    import threading
    
    class myThread(threading.Thread):
    def __init__(self, thread_name):
    super().__init__(name=thread_name)
    
    def run(self):
    global myevent
    if myevent.isSet():
    myevent.clear()
    myevent.wait()
    print('a' + str(threading.current_thread().name))
    else:
    print('b' + str(threading.current_thread().name))
    myevent.set()
    
    myevent = threading.Event()
    myevent.set()
    ts = []
    
    for i in range(10):
    t = myThread(str(i))
    ts.append(t)
    
    for i in ts:
    i.start()
  • 生产者与消费者–Event版
  • import threading
    import queue
    import time
    
    goods = queue.Queue(5)
    num = 0
    
    class Producer(threading.Thread):
    def __init__(self, event):
    self.event = event
    super().__init__(name="producer")
    
    def run(self):
    global num
    while True:
    if goods.empty():
    self.event.clear()
    time.sleep(2)
    for i in range(5):
    num += 1
    goods.put(str(num))
    print("生产了商品:%s" % str(num))
    self.event.set()
    
    class Customer(threading.Thread):
    def __init__(self, event, name):
    self.event = event
    self.money = 3
    super().__init__(name=name)
    
    def run(self):
    while self.money:
    self.event.wait()
    self.money -= 1
    print(self.getName() + "买了个:" + goods.get())
    time.sleep(1)
    print(self.getName() + "没钱了,结束。")
    
    def main():
    event = threading.Event()
    event.set()
    
    p = Producer(event)
    c1 = Customer(event, "c1")
    c2 = Customer(event, "c2")
    p.start()
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    
    if __name__ == "__main__":
    main()
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: