Jdk1.8版本并发包基类AQS(AbstractQueuedSynchronizer)实现原理分析
2018-01-26 23:14
573 查看
一、AbstractQueuedSynchronizer是干嘛的?
AbstractQueuedSynchronizer(以下简称AQS)是理解JAVA并发包里面各种类使用的一个重要的基础抽象基类,最近也在网上看了一些博客专门讲这个,感觉有些地方没有说清楚,也有一些笔误或者理解的问题,所以自己决定也写一篇。做个笔记,方便以后查阅,如果理解有误的地方,欢迎大家批评指正。那么,AQS是干嘛的呢,如果经常使用java并发包的朋友应该会有所了解。AQS是一个抽象的基类,它内部通过一个FIFO的同步等待队列 + 一个volatile修饰的state来管理节点等待状态等,整个类采用模板模式实现,提供一些方法供子类实现,支持互斥和共享2种模式。
本文基于Jdk1.8版本(虽然jdk1.9已经发布了,但目前主流的使用版本应该还是1.7或者1.8)。
二、同步等待队列
在AQS里面,同步等待队列叫做CLH(Craig, Landin, and Hagersten);当线程要入队的时候,只需要构建成一个Node节点加入队尾即可,如果要出队,只需要将队首节点移除,同时唤醒下一个等待的节点。这个CLH队列可以看成是一个链表实现的,刚开始队列里面只有一个空的Head节点,这个时候Head节点和Tail节点都是同一个节点,这个节点是虚拟的,当有线程没有获取的锁对象的时候,那么就需要构建一个Node节点往这个CLH队列里面加,同时更新节点之间的引用关系(主要就是链表的东西),然后加入到队列里面后,一直自旋去尝试获取同步等待状态,当然,在队列里自旋的时候,还有其他很多细节,接下来我们先来分析这个Node节点的定义。
static final class Node { /** Marker to indicate a node is waiting in shared mode */ //表示一个共享节点模式下等待的常量 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ //表示在互斥模式下等待的常量 static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ //表示当前节点线程被取消 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ //表示当前节点线程等待被唤醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ //表示当前节点线程正在等待某个条件 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ //表示下一次共享模式的同步等待状态的获取将会一直循环下去 static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arran fd6c ged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ //当前节点的前一个节点 volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ //当前节点的后一个节点 volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ //获取同步等待状态的当前线程 volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ //当前节点的下一个处于等待状态的节点 Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ //找到当前节点的前一个节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
同步等待相关的节点定义
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ //头节点 private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ //尾节点 private transient volatile Node tail; /** * The synchronization state. */ // volatile修饰的int类型的同步等待状态 private volatile int state;
AQS主要就靠这个Node节点来维护同步等待队列,接来下我们分互斥模式访问和共享模式访问来分别看,AQS是如何实现的。
三、互斥模式
在互斥模式下,请求去获取一个锁的方法如下(因为还提供了支持中断和超时的实现,所以这里先以不支持超时和中断的这个方法作为入口)public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire方法,首先会调用tryAcquire,这是一个空的方法,具体实现交给子类自己去实现,如果tryAcquire方法返回true,说明这个线程获取了锁,那么这里就不处理,如果返回了false,会分为以下几步:
1、addWaiter 方法构建一个互斥模式的节点Node,将其加入这个CLH队列。
2、acquireQueued方法,当节点入队后,会一直自旋,acquireQueued方法主要用来处理这个自旋节点什么时候该被唤醒,是否该被中断等
下面来看一下具体的源码,先看addWaiter是如何构建Node节点入队的
private Node addWaiter(Node mode) { //构造一个节点,第一个参数为当前线程,第二个参数为节点类型 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //首先尝试快速入队尾,如果失败了再说,这里是将队尾节点tail赋值为一个新节点pred Node pred = tail; if (pred != null) { //如果快速入队成功了,那么就将当前节点的前一个节点设置为原来的那个队尾节点 node.prev = pred; //CAS将当前节点设置为队尾节点 if (compareAndSetTail(pred, node)) { //原来的队尾节点的下一个节点就是当前这个入队节点,也就是当前入队节点现在是队尾节点 pred.next = node; return node; } } //如果快速入队失败了,那么实行正常入队 enq(node); return node; }
当快速入队失败了,那么就自旋去进行正常入队
private Node enq(final Node node) { /**一直自旋,相当于一个死循环 */ for (;;) { Node t = tail; //如果目前队列里面还没有任何节点,那么需要new一个哑铃节点来充当队列头节点,此时整个队列里面 //只会有一个虚拟的head,tail节点,这2个节点指向的是同一个节点 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //将当前节点的前驱节点指向同步等待队列的尾节点。 node.prev = t; //CAS设置队列尾节点为当前节点 if (compareAndSetTail(t, node)) { //设置成功后,以前的队尾节点的下一个节点就指向了当前入队的节点,这时入队成功,然后返回 t.next = node; return t; } } } }
入队成功后,进行自旋等待,看什么时候,自己可以被唤醒去尝试获取锁
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //刚刚加入到CLH队列的当前线程是进行自旋,去获取锁 for (;;) { //获取当前节点的上一个节点 final Node p = node.predecessor(); //如果上一个节点是头节点,说明队列里面只有自己在排队等待获取锁,那么就直接尝试去获取锁 if (p == head && tryAcquire(arg)) { //如果获取锁成功,将当前节点设置为头节点 setHead(node); //原本的头节点的下一个节点就不存在了,设置为null,便于gc p.next = null; // help GC failed = false; return interrupted; } //如果上面返回失败了,那么首先看一下当前节点是否需要被阻塞,并检查中断状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
如果这个入队的节点,刚开始不被允许取获取这个等待状态,那么就需要看一下这个节点是否应该被阻塞。什么意思呢,就是加入这个节点进入队列的时候,他前面已经有其他节点排着队了,根据FIFO的公平性原则,你这个节点必须得排队等待,在这里的话也就是说会被阻塞住,就没有必要一直去自旋尝试,只需要等待它的前一个节点来唤醒你就好了。
现在来看一下 shouldParkAfterFailedAcquire,该方法主要是判断当前节点线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前驱节点的等待状态 int ws = pred.waitStatus; //如果前驱节点的等待状态是SIGNAL,表明前驱节点也在等待被唤醒,根据队列公平性原则,当前节点必须排队,所以当前节点需要被阻塞,这里就返回了true if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //如果当前节点的前驱节点的等待状态为 取消状态(可能前驱节点被中断或者超时了) if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 因为前驱节点已经被取消了,那么需要重新规划当前节点的前驱节点的引用,也就是当前节点的前驱节点是已经被 //取消的这个前驱节点的前驱节点 Node <---- preNod <-----Node do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //其他情况CAS设置当前节点的前驱节点等待状态为SIGNAL,返回false compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
回到acquire方法,最后如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回true,说明当前线程被中断,会继续调用selfInterrupt方法
static void selfInterrupt() { //中断当前线程 Thread.currentThread().interrupt(); }
当获取锁后,业务处理完后,就需要释放同步等待状态,会调用下面的方法来释放锁
public final boolean release(int arg) { //首先尝试去释放等待状态,tryRelease同样交给子类自己去个性化实现 if (tryRelease(arg)) { Node h = head; //如果head不为null且,该节点的等待状态不为0,那么就需要唤醒它的后续节点,让后续节点去获取锁 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
当某个节点业务处理完后释放锁之后,需要唤醒它的后继节点,如果后继节点有节点已经被中断或者超时取消了,需要继续找到一个可用的节点唤醒
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获取当前节点的等待状态 int ws = node.waitStatus; if (ws < 0) //如果节点等待状态小于0,那么就设置为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //如果当前节点的下一个节点为null或者说下一个节点的等待状态>0(也就是下一个节点被取消或者中断了),那么需要反向往前找到一个节点不为null且状态不大于0的节点,将它进行唤醒 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒节点线程 LockSupport.unpark(s.thread); }
以上就是互斥模型下基本类型的获取锁,释放锁操作,接下来我们再来看一下支持中断和超时的互斥模式实现。
三、支持响应中断的互斥模式
支持响应中断的意思就是,在CLH队列里等待的节点,可以被中断
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
当tryAcquire返回false,也就是该线程节点没有获取到锁,接着调用 doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg) throws InterruptedException { //构建节点入队 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { //自旋 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //入队后自旋尝试去获取或,如果该节点应该被中断,那么直接抛异常(这点的处理和刚刚上面的处理不一样,上面是加了一个中断的标志) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
可以看到,和最基本的互斥模式实现差不多,至少支持中断的请求,如果判断了该线程节点需要阻塞的话,直接抛一个异常出去。
四、支持超时时间设置的独占模式
当执行tryAcquire的时候,可以传入一个超时时间,当这个节点线程没有获取到锁而在CLH队列里面排队的时候,如果指定时间内还没有获取锁,那么直接返回,不用再队列里再等待。public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
如果没有获取到锁,也就是tryAcquire返回了false,则进入下面的方法
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //如果超时时间小于0,直接返回false if (nanosTimeout <= 0L) return false; //赋值 超时时间 final long deadline = System.nanoTime() + nanosTimeout; //构建节点入队 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //在自旋的过程中,不断校验剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); //如果自旋的过程中已经超时了,直接返回false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //如果已经被中断,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
至此,AQS里面基于互斥模式的获取锁、释放锁操作的源码实现大概流程分析完了。而对于共享模式,其实和互斥模式差不多,下面我们来简单看一下共享模式的AQS实现
五、共享模式
先看一下共享模式下的获取锁方法
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
如果(tryAcquireShared返回小于0表示请求成功(该方法同样交给了子类去实现),如果没有成功则继续调用doAcquireShared,看一下调用失败后的处理方法doAcquireShared
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 将当前线程以共享模式加入同步等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //主循环 for (;;) { //获取当前线程的前一个节点 final Node p = node.predecessor(); //如果p是头节点,那么继续调用tryAcquireShared尝试去获取锁 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //p节点被移除,置空next引用,帮助GC setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //如果当前节点的前驱节点不是头节点,判断当前节点请求失败后是否要被阻塞,如果是,阻塞并保存当前线程中断状态。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
共享模式也有支持中断和超时的方法,和互斥模式实现基本一样,这里就不重复分析了。
接下来看一下共享模式下的释放操作
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; //判断同步等待队列是否为空 if (h != null && h != tail) { //如果不为空,获取头节点的等待状态。 int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果等待状态是SIGNAL,说明其后继节点需要唤醒 //尝试修改等待状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //如果修改失败,重新循环检测。 unparkSuccessor(h);//如果修改成功,唤醒头节点的后继节点。 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果等待状态是0,尝试将其(头节点)设置为PROPAGATE continue; // 如果设置失败,继续循环检测。 } if (h == head) // 如果过程中头节点没有发生变化,循环退出;否则需要继续检测。 break; } }
共享模式的也分析到这,AQS里面还提供了一些检测的方法,比如判断同步等待队列中是否有线程在等待等,都比较简单。 如果读者感兴趣,可自行查看源码或加qq交流。
相关文章推荐
- java并发:AbstractQueuedSynchronizer的介绍和原理分析
- JDK1.8 AbstractQueuedSynchronizer的实现分析(上)
- 深入分析AbstractQueuedSynchronizer共享锁的实现原理:CountDownLatch
- 并发编程--AbstractQueuedSynchronizer介绍和原理分析
- JDK1.8 AbstractQueuedSynchronizer的实现分析(学习笔记)
- 深入分析AbstractQueuedSynchronizer独占锁的实现原理:ReentranLock
- AbstractQueuedSynchronizer 原理分析 - Condition 实现原理
- AbstractQueuedSynchronizer的介绍和原理分析
- AbstractQueuedSynchronizer 原理分析 - 独占/共享模式
- java并发编程--AbstractQueuedSynchronizer的lock()和lockInterruptibly()方法分析(五)
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)
- AbstractQueuedSynchronizer的实现分析(下)
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)
- AbstractQueuedSynchronizer的介绍和原理分析
- 用ReetrantLock来分析java并发中不可错过的类AbstractQueuedSynchronizer
- AbstractQueuedSynchronizer原理分析
- java AbstractQueuedSynchronizer的实现分析(独占锁)
- java并发编程--AbstractQueuedSynchronizer公平锁和非公平锁分析(三)
- Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)
- AbstractQueuedSynchronizer的介绍和原理分析