AbstractQueuedSynchronizer(AQS)源码解析
2018-02-02 17:30
615 查看
首先,我想先推荐看这篇博文AbstractQueuedSynchronizer(AQS)源码解析,我认为写得非常棒。
以下是我自己的一些理解。
CAS,简单来讲,就是将比较与交换这两个操作,通过机器指令,形成一个原子操作,整个concurrent包都基于CAS,可以说,没有CAS,就没有concurrent包。更详细的内容:Java中CAS详解
CLH队列锁,简单来讲,就是一个队列,里面的每个节点都是一个线程,一个线程想要获得锁,只依赖于他的前置线程的获得锁的情况。更详细的内容:JAVA并发编程学习笔记之CLH队列锁。AQS中的同步队列基于它并做了一些修改,是一个包含一个head,一个tail节点的双向队列。
独占模式:锁只能由一个线程获取
共享模式:锁可以由多个线程获取
个人认为,在AQS中,锁的语义,即state变量,对锁的操作,即是对state变量的操作。那么在多线程中,就要保证state变量的原子一致性。AQS为子类提供了3个方法,保证state变量的原子一致性。
步骤:
1.尝试直接获取锁,如果失败,将自己封装在Node中,放入同步队列队尾
2.在同步队列中的线程,获取锁的步骤
2.1 如果该节点是head的next节点,那么会去尝试再获取一次锁,
2.1.1如果失败,将自己阻塞,同时将前置节点(即head节点,head节点不可能为cancel状态)的waitstatus设置为signal(需要唤醒下一个节点的状态),直到被前置节点唤醒,唤醒之后继续尝试获取
2.1.2 如果成功,将head出队,自己晋升为head(head节点个人认为可以理解为拥有锁的节点),此时如果是共享模式下,它还会去唤醒后继节点,让其去尝试获取锁
2.2 如果该节点不是head的next节点,那么直接阻塞,同时将前置节点(如果前置节点为CANCEL状态,则将前置节点替换为上一个不为CANCEL的节点)的waitstatus设置为signal(需要唤醒下一个节点),
直到被前置节点唤醒,唤醒之后继续尝试获取
Q:线程何时被唤醒?
A: 1. 共享模式下,一个线程节点获取到锁,会唤醒它的直接后继线程。
2. 当队列中持有锁的节点释放锁的时候,会唤醒它的直接后继线程。
1. 修改同步状态,让后继线程能够获取到锁
2. 唤醒后继线程,同时将当前释放锁的这个节点的状态,改为0(无状态)
其实,AQS主要就做了3件事,
1. 管理同步状态
2. 管理同步队列
3. 管理线程的阻塞和唤醒
这里有必要先说一下PROPAGATE这个状态。当我们搜索代码时会发现,只有一个地方设置了这个状态
加入这个PROPAGATE状态主要是为了解决共享模式下,多个线程同时释放锁的时候,引起的一个BUG。以Semaphore为例,
我们假设某次循环中队列里排队的节点为情况为:
head -> t1的node -> t2的node(也就是tail)
信号量释放的顺序为t3先释放,t4后释放:
时刻1: t3调用releaseShared,调用了unparkSuccessor(h),head的等待状态从-1变为0
时刻2: t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0
时刻3: t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),不满足条件,因此不会调用unparkSuccessor(h)
时刻4: t1获取信号量成功,调用setHeadAndPropagate时,因为不满足propagate > 0(时刻2的返回值也就是propagate==0),从而不会唤醒后继节点。
那么引入PROPAGATE是怎么解决问题的呢?
引入之后,调用releaseShared方法不再简单粗暴地直接unparkSuccessor,而是将传播行为抽了一个doReleaseShared方法出来。
再看上面的那种情况:
时刻1:t3调用releaseShared -> doReleaseShared -> unparkSuccessor,完了之后head的等待状态为0
时刻2:t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0
时刻3:t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),将等待状态置为PROPAGATE
时刻4:t1获取信号量成功,调用setHeadAndPropagate时,可以读到h.waitStatus < 0,从而可以接下来调用doReleaseShared唤醒t2
也就是说,上面会产生线程hang住bug的case在引入PROPAGATE后可以被规避掉。在PROPAGATE引入之前,之所以可能会出现线程hang住的情况,就是在于
releaseShared有竞争的情况下,可能会有队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁,但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒。
未完待续。。。
以下是我自己的一些理解。
前提
了解AbstractQueuedSynchronizer(AQS)之前,我们有必要了解下CAS(compare and swap)和CLH(Craig, Landin, and Hagersten)队列锁,这两个概念。CAS,简单来讲,就是将比较与交换这两个操作,通过机器指令,形成一个原子操作,整个concurrent包都基于CAS,可以说,没有CAS,就没有concurrent包。更详细的内容:Java中CAS详解
CLH队列锁,简单来讲,就是一个队列,里面的每个节点都是一个线程,一个线程想要获得锁,只依赖于他的前置线程的获得锁的情况。更详细的内容:JAVA并发编程学习笔记之CLH队列锁。AQS中的同步队列基于它并做了一些修改,是一个包含一个head,一个tail节点的双向队列。
1. AQS是什么?
1.1 简介
AQS是由Doug Lea编写的一个基于FIFO队列,用来构建同步锁、同步组件的基础框架,java.util.concurrent包中很多并发工具类都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch。1.2 用途
AQS能够用来简化并发工具的内部实现,它屏蔽了并发工具的同步状态管理,同步队列的维护,线程的阻塞和唤醒。它运用了模板设计模式,因此并发工具的开发者只需要继承AQS并实现特定的方法,将其组合到并发工具中,调用模板方法,来实现并发工具。1.3 特性
AQS提供了两种模式,独占模式和共享模式。独占模式:锁只能由一个线程获取
共享模式:锁可以由多个线程获取
个人认为,在AQS中,锁的语义,即state变量,对锁的操作,即是对state变量的操作。那么在多线程中,就要保证state变量的原子一致性。AQS为子类提供了3个方法,保证state变量的原子一致性。
protected final int getState(); |
protected final void setState(int newState); |
protected final boolean compareAndSetState(int expect, int update); |
2. 实现的简要说明
因为下面的步骤说明中引入了队列中node的waitstatus,因此首先我们来解释下node的waitstatus,即每个节点的等待状态值 | 描述 |
---|---|
CANCELLED (1) | 当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止。 |
SIGNAL (-1) | 当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的。 |
CONDITION (-2) | 当前线程在condition队列中。 |
PROPAGATE (-3) | 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的 |
0 | 表示无状态。 |
2.1 如何获取锁
AQS中维护了一个FIFO同步队列,当一个线程想要获取锁时,步骤:
1.尝试直接获取锁,如果失败,将自己封装在Node中,放入同步队列队尾
2.在同步队列中的线程,获取锁的步骤
2.1 如果该节点是head的next节点,那么会去尝试再获取一次锁,
2.1.1如果失败,将自己阻塞,同时将前置节点(即head节点,head节点不可能为cancel状态)的waitstatus设置为signal(需要唤醒下一个节点的状态),直到被前置节点唤醒,唤醒之后继续尝试获取
2.1.2 如果成功,将head出队,自己晋升为head(head节点个人认为可以理解为拥有锁的节点),此时如果是共享模式下,它还会去唤醒后继节点,让其去尝试获取锁
2.2 如果该节点不是head的next节点,那么直接阻塞,同时将前置节点(如果前置节点为CANCEL状态,则将前置节点替换为上一个不为CANCEL的节点)的waitstatus设置为signal(需要唤醒下一个节点),
直到被前置节点唤醒,唤醒之后继续尝试获取
Q:线程何时被唤醒?
A: 1. 共享模式下,一个线程节点获取到锁,会唤醒它的直接后继线程。
2. 当队列中持有锁的节点释放锁的时候,会唤醒它的直接后继线程。
2.2 如何释放锁
步骤:1. 修改同步状态,让后继线程能够获取到锁
2. 唤醒后继线程,同时将当前释放锁的这个节点的状态,改为0(无状态)
其实,AQS主要就做了3件事,
1. 管理同步状态
2. 管理同步队列
3. 管理线程的阻塞和唤醒
3. 代码解析
static final class Node { /** * 用于标记一个节点在共享模式下等待 */ static final Node SHARED = new Node(); /** * 用于标记一个节点在独占模式下等待 */ static final Node EXCLUSIVE = null; /** * 等待状态:取消 */ static final int CANCELLED = 1; /** * 等待状态:通知 */ static final int SIGNAL = -1; /** * 等待状态:条件等待 */ static final int CONDITION = -2; /** * 等待状态:传播 */ static final int PROPAGATE = -3; /** * 等待状态 */ volatile int waitStatus; /** * 前驱节点 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 节点对应的线程 */ volatile Thread thread; /** * 等待队列中的后继节点 */ Node nextWaiter; /** * 当前节点是否处于共享模式等待 */ final boolean isShared() { return nextWaiter == SHARED; } /* 4000 * * 获取前驱节点,如果为空的话抛出空指针异常 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) { throw new NullPointerException(); } else { return p; } } Node() { } /** * addWaiter会调用此构造函数 */ Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } /** * Condition会用到此构造函数 */ Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
这里有必要先说一下PROPAGATE这个状态。当我们搜索代码时会发现,只有一个地方设置了这个状态
/** * 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。 * 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。 * 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。 */ private void doReleaseShared() { /* * 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程; * 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下, * 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。 * 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。 */ for (;;) { Node h = head; // 如果队列中存在后继线程。 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 检查h是否仍然是head,如果不是的话需要再进行循环。 if (h == head) break; } }
加入这个PROPAGATE状态主要是为了解决共享模式下,多个线程同时释放锁的时候,引起的一个BUG。以Semaphore为例,
我们假设某次循环中队列里排队的节点为情况为:
head -> t1的node -> t2的node(也就是tail)
信号量释放的顺序为t3先释放,t4后释放:
时刻1: t3调用releaseShared,调用了unparkSuccessor(h),head的等待状态从-1变为0
时刻2: t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0
时刻3: t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),不满足条件,因此不会调用unparkSuccessor(h)
时刻4: t1获取信号量成功,调用setHeadAndPropagate时,因为不满足propagate > 0(时刻2的返回值也就是propagate==0),从而不会唤醒后继节点。
那么引入PROPAGATE是怎么解决问题的呢?
引入之后,调用releaseShared方法不再简单粗暴地直接unparkSuccessor,而是将传播行为抽了一个doReleaseShared方法出来。
再看上面的那种情况:
时刻1:t3调用releaseShared -> doReleaseShared -> unparkSuccessor,完了之后head的等待状态为0
时刻2:t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0
时刻3:t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),将等待状态置为PROPAGATE
时刻4:t1获取信号量成功,调用setHeadAndPropagate时,可以读到h.waitStatus < 0,从而可以接下来调用doReleaseShared唤醒t2
也就是说,上面会产生线程hang住bug的case在引入PROPAGATE后可以被规避掉。在PROPAGATE引入之前,之所以可能会出现线程hang住的情况,就是在于
releaseShared有竞争的情况下,可能会有队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁,但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒。
未完待续。。。
相关文章推荐
- AbstractQueuedSynchronizer(AQS)源码解析上
- AQS(AbstractQueuedSynchronizer)源码解析(ConditionObject)
- AbstractQueuedSynchronizer(AQS)源码解析-续
- AbstractQueuedSynchronizer(AQS)源码解析(一)
- AQS(AbstractQueuedSynchronizer)源码解析(共享锁部分)
- AbstractQueuedSynchronizer(AQS)源码解析下
- AbstractQueuedSynchronizer源码解析之ReentrantLock(一)
- AbstractQueuedSynchronizer源码解析之ReentrantLock(二)
- JUC源码解析(6)-locks-AbstractQueuedSynchronizer
- Java 1.6 AbstractQueuedSynchronizer源码解析
- AQS-AbstractQueuedSynchronizer-源码分析
- Java并发包源码学习之AQS框架(四)AbstractQueuedSynchronizer源码分析
- AbstractQueuedSynchronizer与synchronized优缺对比及AQS 源码分析笔记
- JUC-AbstractQueuedSynchronizer(AQS)源码分析
- JUC - AbstractQueuedSynchronizer(AQS) 源码分析
- AQS(AbstractQueuedSynchronizer)同步器的底层解析
- AbstractQueuedSynchronizer源码剖析(六)- 深刻解析与模拟线程竞争资源
- AbstractQueuedSynchronizer实现Android版源码解析(二)
- AQS源码分析(AbstractQueuedSynchronizer)