您的位置:首页 > 其它

深入剖析AQS

2020-06-07 23:34 555 查看
[toc] --- - Posted by [微博@Yangsc_o](https://img-blog.csdnimg.cn/20200515112607757.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbmcxOTg5MDc=,size_16,color_FFFFFF,t_70) - 原创文章,版权声明:自由转载-非商用-非衍生-保持署名 | [Creative Commons BY-NC-ND 3.0](http://creativecommons.org/licenses/by-nc-nd/3.0/deed.zh) --- # 摘要 本文通过ReentrantLock来窥探AbstractQueuedSynchronizer(AQS)的实现原理,在看此文之前。你需要了解一下park、unpark的功能,请移步至上一篇《深入剖析park、unpark》; # AbstractQueuedSynchronizer实现一把锁 根据AbstractQueuedSynchronizer的官方文档,如果想实现一把锁的,需要继承AbstractQueuedSynchronizer,并需要重写tryAcquire、tryRelease、可选择重写isHeldExclusively提供locked state、因为支持序列化,所以需要重写readObject以便反序列化时恢复原始值、newCondition提供条件;官方提供的java代码如下(官方文档见参考连接); ```java public class MyLock implements Lock, java.io.Serializable { private static class Sync extends AbstractQueuedSynchronizer { // Acquires the lock if state is zero @Override public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero @Override protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } // Reports whether in locked state @Override protected boolean isHeldExclusively() { return getState() == 1; } } /** * The sync object does all the hard work. We just forward to it. */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } private static volatile Integer value = 0; public static void main(String[] args) { MyLock myLock = new MyLock(); for (int i = 0; i < 1000; i++) { new Thread(()->{ myLock.lock(); value ++; myLock.unlock(); }).start(); } System.out.println(value); } } ``` --- 上面是一个不可重入的锁,它实现了一个锁基础功能,目的是为了跟ReentrantLock的实现做对比; # ReentrantLock ## ReentrantLock的特点 ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。ReentrantLock跟常用的Synchronized进行比较; ![image-20200603140814563](https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200603140814563.png) ## Synchronized的基础用法 Synchronized的分析可以参考《深入剖析synchronized关键词》,ReentrantLock可以创建公平锁、也可以创建非公平锁,接下来看一下ReentrantLock的简单用法,非公平锁实现比较简单,今天重点是公平锁; ```java public class ReentrantLockTest { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(true); reentrantLock.lock(); try { log.info("lock"); } catch (Exception e) { log.error(e); } finally { reentrantLock.unlock(); log.info("unlock"); } } } ``` ## ReentrantLock与AQS的关联 先看一下加锁方法lock - 非公平锁lock方法 compareAndSetState很好理解,通过CAS加锁,如果加锁失败调用acquire; ```java /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ``` - 公平锁lock方法 ```java final void lock() { acquire(1); } ``` - AQS框架的处理流程 ​ 线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续,分析实现原理 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` 总结:公平锁的上锁是必须判断自己是不是需要排队;而非公平锁是直接进行CAS修改计数器看能不能加锁成功;如果加锁不成功则乖乖排队(调用acquire);所以不管公平还是不公平;只要进到了AQS队列当中那么他就会排队; ## AQS架构图 美团画的AQS的架构图,很详细,当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。 ![82077ccf14127a87b77cefd1ccf562d3253591](https://gitee.com/yangsanchao/images/raw/master/blogs/82077ccf14127a87b77cefd1ccf562d3253591.png) AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。 CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。 - 非公平锁的加锁流程 - 公平锁的加锁流程 - 解锁公平锁和非公平锁逻辑一致 加锁: - 通过ReentrantLock的加锁方法Lock进行加锁操作。 - 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。 - AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。 - tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。 - 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire) 解锁: - 通过ReentrantLock的解锁方法Unlock进行解锁。 - Unlock会调用内部类Sync的Release方法,该方法继承于AQS。 - Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。 - 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。 - 流程:unlock -> release -> tryRelease ## acquire获取锁 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } } ``` ### tryAcquire acquire方法首先会调tryAcquire方法,需要注意的是tryAcquire的结果做取反;根据前面分析,tryAcquire会调用子类的实现,ReentrantLock有两个内部类,FairSync,NonfairSync,都继承自Sync,Sync继承AbstractQueuedSynchronizer; 实现方式差别在是否有hasQueuedPredecessors() 的判断条件 - 公平锁实现 ```java /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取lock对象的上锁状态,如果锁是自由状态则=0,如果被上锁则为1,大于1表示重入 int c = getState(); if (c == 0) { // hasQueuedPredecessors,判断自己是否需要排队 // 下面我会单独介绍,如果不需要排队则进行cas尝试加锁 // 如果加锁成功则把当前线程设置为拥有锁的线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果C不等于0,但是当前线程等于拥有锁的线程则表示这是一次重入,那么直接把状态+1表示重入次数+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` 非公平锁 ```java /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` ### hasQueuedPredecessors ```java public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` - Node 先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。 ```java static final class Node { static final Node SHARED = new Node(); // 表示线程以共享的模式等待锁 static final Node EXCLUSIVE = null; // 表示线程正在以独占的方式等待锁 static final int CANCELLED = 1; // 表示线程获取锁的请求已经取消了 static final int SIGNAL = -1; // 表示线程已经准备好了,就等资源释放了 static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒 static final int PROPAGATE = -3; // 当前线程处在SHARED情况下,该字段才会使用 volatile int waitStatus; // 当前节点在队列中的状态 volatile Node prev; // 前驱指针 volatile Node next; // 后继指针 volatile Thread thread; // 表示处于该节点的线程 Node nextWaiter; // 指向下一个处于CONDITION状态的节点 final boolean isShared() { return nextWaiter == SHARED; } // 返回前驱节点,没有的话抛出npe final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } ``` 再看hasQueuedPredecessors,整个方法如果最后返回false,则去加锁,如果返回true则不加锁,因为这个方法被取反操作;hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。 - h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。 - 当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析)。 - 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。 - 如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的; - 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。 如果这上面没有看懂,没有关系,先来分析一下构建整个队列的过程; - addWaiter(Node.EXCLUSIVE) ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail为对尾,赋值给pred Node pred = tail; // 判断pred是否为空,其实就是判断对尾是否有节点,其实只要队列被初始化了对尾肯定不为空 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` - enq ```java private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` 用一张图来分析一下,整个队列构建过程; ![image-20200604203002973](https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604203002973.png) - (1)通过Node(Thread thread, Node mode) 方法构建一个node节点(node2),此时的nextWaiter为空,线程不为空,是当前线程; - (2)如果队尾为空,则说明队列未建立,调用enq构建第一个虚拟节点(node1),通过compareAndSetHead方法构建一个头节点,需要注意的是该头节点thread是null,后续很多都是用线程是否为null来判读是否为第一个虚拟节点; - (3)将node1 cas设置为head - (4)将头节点赋值为tail = head - (5)进入下一次for循环时,会走到else分支,会将传入的node的指向头部节点的next,此时node2的prev指向node1(tail) - (6)将node2 cas设置为tail; - (7)将node2指向node1的next; 经过上面的步骤,就构建了一个长度为2的队列; 添加第二个队列时,走的是这段代码,流程就简单多了,代码如下 ```java if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } ``` ![image-20200604202859302](https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604202859302.png) 再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());因为整个构建过程并不是原子操作,所以这个条件判断,现在再是不是就看明白了? - 当h != t时(3)步骤已经完成: 如果(s = h.next) == null 此时步骤(4)未完成,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True - 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。 - 如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的; - 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。 ### acquireQueued addWaiter方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。 下面通过代码从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码: ```java final boolean acquireQueued(final Node node, int arg) { // 标记是否成功拿到资源 boolean failed = true; try { // 标记等待过程中是否中断过 boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点,有两种情况;1、上一个节点为头部;2上一个节点不为头部 final Node p = node.predecessor(); // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(头结点是虚节点) // 因为第一次tryAcquire判断是否需要排队,如果需要排队,那么我就入队,此处再重试一次 if (p == head && tryAcquire(arg)) { // 获取锁成功,头指针移动到当前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 成功拿到资源,准备释放 if (failed) cancelAcquire(node); } } ``` ### setHead 设置当前节点为头节点,并且将node.thread为空(刚才提到判断是否为头部虚拟节点的条件就是node.thread == null。waitStatus状态并为修改,等下我们再分析; ```java private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ``` ### shouldParkAfterFailedAcquire 接下来看shouldParkAfterFailedAcquire代码,需要注意的是,每一个新创建Node的节点是被下一个排队的node设置为等待状态为SIGNAL, 这里比较难以理解为什么需要去改变上一个节点的park状态? 每个node都有一个状态,默认为0,表示无状态,-1表示在park;当时不能自己把自己改成-1状态?因为你得确定你自己park了才是能改为-1;所以只能先park;在改状态;但是问题你自己都park了;完全释放CPU资源了,故而没有办法执行任何代码了,所以只能别人来改;故而可以看到每次都是自己的后一个节点把自己改成-1状态; ```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) return true; // static final int CANCELLED = 1; // 表示线程获取锁的请求已经取消了 // static final int SIGNAL = -1; // 表示线程已经准备好了,就等资源释放了 // static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒 // static final int PROPAGATE = -3; // 当前线程处在SHARED情况下,该字段才会使用 if (ws > 0) { do { // 把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ``` ### parkAndCheckInterrupt 调用LockSupport.park挂起当前线程,自己已经park,无法再修改状态了! ```java private final boolean parkAndCheckInterrupt() { // 调⽤用park()使线程进⼊入waiting状态 LockSupport.park(this); // 如果被唤醒,查看⾃自⼰己是不不是被中断的,这⾥里里先清除⼀下标记位 return Thread.interrupted(); } ``` shouldParkAfterFailedAcquire的整个流程还是比较清晰的,如果不清楚,可以参考美团画的流程图; ### cancelAcquire 通过上面的分析,当failed为true时,也就意味着park结束,线程被唤醒了,for循环已经跳出,开始执行cancelAcquire,通过cancelAcquire方法,将Node的状态标记为CANCELLED;代码如下: ```java private void cancelAcquire(Node node) { // 将无效节点过滤 if (node == null) return; // 设置该节点不关联任何线程,也就是虚节点(上面已经提到,node.thread = null是判读是否是头节点的条件) node.thread = null; Node pred = node.prev; // 通过前驱节点,处理waitStatus > 0的node while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 把当前node的状态设置为CANCELLED,当下一个node排队结束时,自己就会被上一行代码处理掉; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点,更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { // 把自己设置为null compareAndSetNext(pred, predNext, null); } else { int ws; // 如果当前节点不是head的后继节点 // 1:判断当前节点前驱节点的是否为SIGNAL // 2:如果不是,则把前驱节点设置为SINGAL看是否成功 // 如果1和2中有一个为true,再判断当前节点的线程是否为null // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } } ``` 当前的流程: - 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。 - 根据当前节点的位置,考虑以下三种情况: (1) 当前节点是尾节点。 (2) 当前节点是Head的后继节点。 (3) 当前节点不是Head的后继节点,也不是尾节点。 (1)当前节点时尾节点 (2)当前节点是Head的后继节点。 这张图描述的是这段代码:unparkSuccessor ```java Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } ``` (3)当前节点不是Head的后继节点,也不是尾节点。 这张图描述的是这段代码跟(2)一样; 通过上面的图,你会发现所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作,原因是执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),也就是下图中代码1和代码2直接的间隙就会出现这种情况,此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。 ## unlock解锁 解锁时并不区分公平和不公平,因为ReentrantLock实现了锁的可重入,可以进一步的看一下时如何处理的,上代码: ```java public void unlock() { sync.release(1); } ``` ### release ```java public final boolean release(int arg) { // 自定义的tryRelease如果返回true,说明该锁没有被任何线程持有 if (tryRelease(arg)) { // 获取头结点 Node h = head; if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态 unparkSuccessor(h); return true; } return false; } ``` 这里的判断条件为什么是h != null && h.waitStatus != 0 1. h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。如果还没来得及入队,就会出现head == null 的情况。 2. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。 3. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒,(还记得一个node是在shouldParkAfterFailedAcquire方法中被设置为SIGNAL = -1的吧?不记得翻看一下上面吧) ### tryRelease ```java protected final boolean tryRelease(int releases) { // 减少可重入次数,setState(c); int c = getState() - releases; // 当前线程不是持有锁的线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` ### unparkSuccessor 这个方法在cancelAcquire其实也用到了,简单分析一下 > // 如果当前节点是head的后继节点,或者上述条件不满足,就唤醒当前节点的后继节点unparkSuccessor(node); ```java private void unparkSuccessor(Node node) { // 获取结点waitStatus,CAS设置状态state=0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取当前节点的下一个节点 Node s = node.next; // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点 if (s == null || s.waitStatus > 0) { s = null; // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark if (s != null) LockSupport.unpark(s.thread); } ``` 为什么要从后往前找第一个非Cancelled的节点呢? 原因1:addWaiter方法并非原子,构建链表结构时如下图中 1、2间隙执行unparkSuccessor,此时链表是不完整的,没办法从前往后找了; ![image-20200607184934087](https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607184934087.png) 原因2:还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node; ## 中断恢复 唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。 ```java private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } ``` acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ``` 如果acquireQueued为True,就会执行selfInterrupt方法。 该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下: 1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。 2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。 这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,可以看下ThreadPoolExecutor源码的判断条件; ![image-20200607210022068](https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607210022068.png) # 其它 AQS在JUC中有⽐比较⼴广泛的使⽤用,以下是主要使⽤用的地⽅方: - ReentrantLock:使⽤用AQS保存锁重复持有的次数。当⼀一个线程获取锁时, ReentrantLock记录当 前获得锁的线程标识,⽤用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理理。 - Semaphore:使⽤用AQS同步状态来保存信号量量的当前计数。 tryRelease会增加计数, acquireShared会减少计数。 - CountDownLatch:使⽤用AQS同步状态来表示计数。计数为0时,所有的Acquire操作 (CountDownLatch的await⽅方法)才可以通过。 - ReentrantReadWriteLock:使⽤用AQS同步状态中的16位保存写锁持有的次数,剩下的16位⽤用于保 存读锁的持有次数。 - ThreadPoolExecutor: Worker利利⽤用AQS同步状态实现对独占线程变量量的设置(tryAcquire和 tryRelease)。 --- 至此,通过ReentrantLock分析AQS的实现原理一家完毕,需要说明的是,此文深度参考了美团分析的ReentrantLock,是参考链接的第三个,有兴趣可以对比差异,感谢! # 参考 [JDK API 文档](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer.html) [Java的LockSupport.park()实现分析](https://blog.csdn.net/hengyunabc/article/details/28126139) [[从ReentrantLock的实现看AQS的原理及应用](https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html) [[Thread的中断机制(interrupt)](https://www.cnblogs.com/onlywujun/p/3565082.html) --- # 你的鼓励也是我创作的动力 [打赏地址](https://img-blog.csdnimg.cn/20200515115954900.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbmcxOTg5MDc=,size_8,color_FFFFFF,t_70)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: