Java多线程--AQS源码阅读
一、AQS简介
JUC包的时间基石是CAS操作,但是在每一个同步器中都直接进行CAS操作会很麻烦,JUC提供了一个同步框架AQS(AbstractQueuedSynchronizer),AQS封装了一系列同步资源的操作,并为同步器的实现留出了获取和释放等方法如tryAcquire、tryRelease等,各同步器可以自己覆盖这些方法来实现同步操作。
AQS实现同步的方式为维护了一个整形数state,通过对state状态的修改来表示共享资源访问和释放,一个FIFO队列来表示请求资源的线程,暂未能获取到资源的线程会进入此队列。
二、AQS的属性
AQS的属性非常简单,就三个:
[code]// 同步队列的队首 private transient volatile Node head; // 队尾 private transient volatile Node tail; // 代表共享资源 private volatile int state;
三、内部Node类
AQS中线程等待的FIFO队列的元素类型为内部Node类,该类的源码如下。
[code]static final class Node { // 默认是共享状态 static final Node SHARED = new Node(); // 独占状态 static final Node EXCLUSIVE = null; // 代表该节点的线程被取消了 static final int CANCELLED = 1; // 该节点在释放锁后悔通知后继节点 static final int SIGNAL = -1; // 该节点线程在等待Condition static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。 static final int PROPAGATE = -3; // 注意一下属性均为volatile类型的,这是CAS的基石 // 等待状态,默认为0,取值为上面的几种状态 // 通常不需要判断具体的值,为负表示有效,为正表示被取消 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 该节点的线程 volatile Thread thread; */ // 下一个等待condition的节点,condition队列只有在独享模式有效 // 可以用来判断是否是独占模式 Node nextWaiter; // 判断是否是独占模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
四、资源的获取和释放
Java的许多同步器都是基于AQS实现的,如果你去看这些同步器的lock方法实现,通常里面都会是一句acquire(1);这即是在调用AQS框架的获取资源的方法。
AQS中提供了几个方法让子类重写:
tryAcquire(int) 获取独占资源
tryRelease(int) 释放独占资源
tryAcquireShared(int) 获取共享资源
tryReleaseShared(int) 释放共享资源
isHeldExclusively() 判断当前占有资源的线程是否是独占调用的
这几个方法基本定义了独占和共享资源的获取和释放,通常子类只需要实现独占或共享中的一种即可,也有例外如:ReentrantReadWriteLock自旋读写锁同时实现了共享模式和独占模式。
先看独占模式获取资源的方法:
[code]// 获取资源 public final void acquire(int arg) { // 获取失败且入队后进入了中断状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 线程在等待队列中是不响应中断的,在获取到资源后补上中断 selfInterrupt(); }
acquire方法会先去尝试直接拿到资源tryAcquire,如果成功直接返回,若失败则尝试进入同步队列获取资源。整个过程中忽略中断。
[code]// 子类重写 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
在AQS中尝试获取资源是直接报错的,此方法是AQS留给同步器自己去实现的。当tryAcquire失败,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先看addWaiter(Node.EXCLUSIVE):
[code]// 入队 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果队尾存在,即队列已经初始化过,将队尾设置为当前节点 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 队列未初始化 enq(node); return node; } // 入队 private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize 初始化等待队列将自己设为队首 if (compareAndSetHead(new Node())) tail = head; } else { // 将自己设为队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter方法其实就是将当前节点放置到同步队列中去,放置节点的模式为EXCLUSIVE独占模式,共享模式的节点放置在tryAcquireShared(int)方法中。
节点入队后会在队列中尝试获取资源,即acquireQueued方法:
[code]// 从等待队列自旋获取锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { // 前驱节点 final Node p = node.predecessor(); // 前驱节点是首节点即自己是老二。 // 当自己是老二时可以尝试去获取资源,因为可能老大释放了资源却没出队 if (p == head && tryAcquire(arg)) { // 自己是老二且获取资源成功了 // 将自己设置为队首,成为老大 setHead(node); // 原队首节点的后继变为null,因为首节点的前继节点也是null,清空引用帮助GC p.next = null; // help GC failed = false; // 只有在获取到资源后才会停止自旋并返回在自旋过程中是否中断过 return interrupted; } // 不是老二或者获取资源失败,尝试进入中断状态--park // 进入中断之前要通知自己的前继节点释放资源时记得通知我 // shouldParkAfterFailedAcquire给前继节点通知 // parkAndCheckInterrupt判断是否已被中断,若是将中断状态清除,中断标志会在acquire方法最后补上 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
当当前节点无法获取到资源时会因为等待资源进入等待状态,但是在进入等待状态之前必先保证自己会在可以获取资源时被唤醒。
进入等待状态调用方法:parkAndCheckInterrupt
[code]// 进入中断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //interrupted方法会清除中断标志的,这也是acquireQueued忽略中断的原因 return Thread.interrupted(); }
进入等待之前的操作shouldParkAfterFailedAcquire:
[code]// 通知前驱节点释放资源后唤醒我 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点已经是会唤醒后续的状态,可以安全的进入中断了,其他情况都无法安全进入中断 if (ws == Node.SIGNAL) return true; // 其他情况都返回false会在acquireQueued方法的自旋中重试 // 前驱节点放弃了,一直找到自己之前的未放弃的节点,插队排在他后面 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前驱节点设置的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
改节点线程的中断标志会在acquire方法的最后补上,即selfInterrupt();
acquire的源码阅读结束,回顾一下其操作流程:
1、尝试调用tryAcquire直接获取锁,成功直接返回。否则进入2
2、调用addWaiter将当前节点进入同步队列。
3、调用acquireQueued以自旋的方式从同步队列中获取资源。
4、若是获取资源不成功将返回值设置为true,即在同步队列中有中断。
5、如果在同步队列中有中断,在acquire方法最后调用selfInterrupt方法补上中断。
资源释放:
资源的获取到此结束,获取资源后需要对资源进行释放,释放资源调用release方法:
[code]public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 唤醒头结点之后的第一个有效节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
和tryAcquire一样,tryRelease方法也是由子类重写的。
[code]// 子类重写 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
release方法会在释放资源成功后调用unparkSuccessor方法来唤醒下一个等待资源的线程:
[code]private void unparkSuccessor(Node node) { // 置零当前节点的状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 找到第一个未取消的节点将其唤醒unpark // 查找的过程是从后往前找,因为在shouldParkAfterFailedAcquire中会把队中的cancell节点 //踢出队,踢出的方式为修改其next指针,所以不能从前往后找 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
至此,release方法执行完毕。release的实现方法比acquire方法简单多了,基本步骤如下:
1、调用tryRelease释放资源,tryRelease需要子类重写,若失败返回false否则进入2。
2、释放资源成功,需要调用unparkSuccessor来unpark同步队列中的下一个有效节点。
在AQS中除了acquire方法获取独占资源外,还提供了acquireShared方法来获取共享资源。
[code]public final void acquireShared(int arg) { // 返回负数表示失败 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared与tryAcquire类似,不同的是tryAcquire方法返回一个boolean值表示资源可获取和不可获取,而tryAcquireShared返回的是一个int值,表示还可以被多少个线程共享。当然tryAcquireShared方法也要由子类覆盖:
[code]// 子类重写 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
当无法获取到资源时,便尝试进入同步队列获取资源,此方法与acquireQueued方法类似:
[code]private void doAcquireShared(int arg) { // 添加共享模式节点入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { final Node p = node.predecessor(); // p为队首,即node是老二,尝试获取资源 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 将自己设为队首,如果r>0有剩余资源唤醒下一个节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 与独占模式一样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
共享模式的释放也与独占模式类似:
[code] // 共享模式的释放 public final boolean releaseShared(int arg) { // tryReleaseShared由子类重写 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 从同步队列中释放资源 private void doReleaseShared() { // 自旋,目的是将head节点的等待状态变为PROPAGATE for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果等待状态是SIGNAL,释放后要通知后续节点 // 如果当前状态为SIGNAL先变为0,再在下一次自旋后变为PROPAGATE,因为unparkSuccessor会将其变为0 if (ws == Node.SIGNAL) { // 将状态变为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 通知 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果进行上述操作后首节点变了,会对新首节点尝试释放资源 if (h == head) // loop if head changed break; } }
除acquire和acquireShared方法外AQS还提供了一种等待锁时允许中断的方法:acquireInterruptibly,此方法的实现与acquire几乎完全一致,不同的是当得不到资源时不是标记中断而是直接抛出异常。
[code]public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 与acquireQueued几乎一致 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); 20000 p.next = null; // help GC failed = false; return; } // 如果发现得不到锁不是标记中断而是直接抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
阅读更多
- Java多线程——ReentrantLock源码阅读
- Java多线程框架源码阅读之---ReentrantLock
- Java多线程——AQS框架源码阅读
- 我之见--java多线程之可重入锁,读写锁源码分析 及自定义锁AQS
- Java多线程系列(六)—AQS源码分析
- java并发包源码阅读笔记(1)-AQS源码研究
- Java多线程——AQS框架源码阅读
- Java多线程——ReentrantLock源码阅读
- Java多线程——AQS框架源码阅读
- Java多线程 -- JUC包源码分析19 -- ForkJoinPool/ForkJoinTask
- JAVA源码阅读——–Short类
- Java源码阅读——HashMap
- Java并发包源码学习之AQS框架(三)LockSupport和interrupt
- java源码阅读-java.lang.String(01)
- JDK源码阅读——java.lang.Boolean
- 如何阅读Java源码
- Java源码阅读的真实体会
- Java对象的序列化和反序列化源码阅读
- 基于Java多线程的下载器源码剖析(三)
- Java源码阅读体会