java源码解读之——concurrent并发包中的AbstractQueuedSynchronizer(AQS)解读
java深度解读之——concurrent并发包中的AbstractQueuedSynchronizer(AQS)
- 一、什么是AQS?
- 二、 怎么理解AQS?
- 三、源码解读
- 3.1 AQS的获取资源(acquire)流程
- 3.1.1tryAcquire()
- 3.1.2 addWaiter(Node)
- 3.1.3 acquireQueued(Node, int)
一、什么是AQS?
先上一下官方的解释(Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.)。根据官方的解释,我们大致可以了解到AQS是一个实现堵塞锁(例如:ReetranLock、ReentrantReadWriteLock)还有相关的同步结构例如(semaphores)的一个基于先入先出的同步堵塞队列,它是concurrent并发包中许多重要并发工具的一个基础。
二、 怎么理解AQS?
Doug Lea曾经介绍过 AQS 的设计初衷。从原理上,种同步结构往往是可以利用其他的结构实现的,所以,他选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 为我们构建同步结构提供范本。如何解读AQS?我觉得应该从整体上把握,AQS可以拆分为:
1.一个记录当前资源的volatile字段:State,提供Set和Get的方法,作用是表示当前资源(例如ReetranLock里面的锁)的持有情况。AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
2.一个先入先出的FIFO等待线程队列,以实现多线程间的竞争和等待,是AQS机制的核心之一。
3.各种基于CAS自旋机制的基础操作方法,以及各种希望子类实现的acquire/release方法。
三、源码解读
如上图是ReetranLock上锁的流程,里面的底层使用了AQS来实现。
3.1 AQS的获取资源(acquire)流程
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
此方法是独占模式下线程获取共享资源的最顶层的入口。本法调用首先调用了acquire方法去尝试获取资源,当获取失败后调用addWaiter(Node.EXCLUSIVE)和acquireQueued(),值得注意的是整个获取的过程忽略中断的影响。
整个函数的执行顺序:
1.tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2.addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3.acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
3.1.1tryAcquire()
此方法尝试独占获取资源,当获取成功后返回TRUE,否则FALSE,但是具体的实现逻辑要交给自定义的同步器来实现,这里只是一个框架。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
3.1.2 addWaiter(Node)
private Node addWaiter(Node mode) { //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //尝试快速方式直接放到队尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失败则通过enq入队。 enq(node); return node; }
注意,AQS队列中,将线程包装成了类似于链表的一个结点,这个结点包含了需要同步的线程本身以及线程的状态,通过变量waitStatus。表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SINGAL:值为-1,标识为当前线程的后续结点等待被唤醒,当前线程释放后,会通知该后续节点执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
AQS根据几种取值的不同来决定等待队列中的结点是否会被唤醒。
3.1.3 acquireQueued(Node, int)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//标记是否成功拿到资源 try { boolean interrupted = false;//标记等待过程中是否被中断过 //自旋直到获得资源 for (;;) { final Node p = node.predecessor();//拿到前驱 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了! failed = false; return interrupted;//返回等待过程中是否被中断过 } //如果自己可以休息了,就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true } } finally { if (failed) cancelAcquire(node); } }
根据对源码的分析,我们可以总结一下acquireQueued(final Node node, int arg)调用方法的流程:
- 结点进入队尾后,调用shouldParkAfterFailedAcquire()方法检查前一个结点的状态是否为SINGAL后找到安全点
- 调用park()方法,使线程进入waiting状态,等待被unpark()或者interrupt().
- 当线程被唤醒后,查看自己是否能拿到资源,能拿到资源就将当前线程移除等待队列,否则继续流程1.
3.2 AQS的释放资源(release)流程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//找到头结点 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒等待队列里的下一个线程 return true; } return false; }
release方法的流程并不复杂,主要就是将堵塞队列的头结点取出并唤醒下一个线程,但是tryRelease(arg)的返回值是它判断是否释放的标准,所以自定义同步器必须要重写这个方法。 release()是独占模式下线程释放共享资源的顶层入口,如果彻底释放了(state=0),它会唤醒等待队列里其他线程来获取资源。
3.2.1 unparkSuccessor(Node)
private void unparkSuccessor(Node node) { //这里,node一般为当前线程所在的结点。 int ws = node.waitStatus; if (ws < 0)//置零当前线程所在的结点状态,允许失败。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一个需要唤醒的结点s if (s == null || s.waitStatus > 0) {//如果为空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒 }
此方法可以用于唤醒等待队列中的下一个线程。
四、总结
本文主要介绍了AQS的概念,以及AQS数据结构的大概组成。并分析了独占模式下的(acquire-release)的源码,希望能加深大家对AQS的理解!
- 点赞 1
- 收藏
- 分享
- 文章举报
- Java并发包源码学习之AQS框架(四)AbstractQueuedSynchronizer源码分析
- AbstractQueuedSynchronizer(AQS源码解读)
- java 并发编程学习笔记(六)之 AQS (AbstractQueuedSynchronizer)
- AQS(AbstractQueuedSynchronizer)源码解析(共享锁部分)
- java.util.concurrent.locks.AbstractQueuedSynchronizer队列同步器源码解析
- Java并发源码剖析(一)——AbstractQueuedSynchronizer独占模式
- Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式
- 深入学习java并发编程:Lock与AbstractQueuedSynchronizer(AQS)实现
- AbstractQueuedSynchronizer与synchronized优缺对比及AQS 源码分析笔记
- Java 1.6 AbstractQueuedSynchronizer源码解析
- JAVA并发九:深入理解AbstractQueuedSynchronizer(AQS)
- AbstractQueuedSynchronizer(AQS)源码解析-续
- 可重入锁 ReentrantLock 源码解读 (2)锁框架 AbstractQueuedSynchronizer
- Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)
- java中的队列同步器AQS -- AbstractQueuedSynchronizer
- Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch
- Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)
- Concurrent包源码解读之AbstractQueuedSynchronizer
- Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)
- [置顶] java中的队列同步器AQS -- AbstractQueuedSynchronizer