AbstractQueuedSynchronizer框架
2016-07-25 22:00
585 查看
常开发中,大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLock,Semaphore,Java线程池中的Worker等。本文将介绍AQS相关的实现细节。
AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState及compareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/787c951efe9549ab0392ff04e3309b67.png)
AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/4b16dc9c11a7e6785359eb6f4e6aa572.png)
当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:
AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node:
其中AbstractQueuedSynchronizer维护的链表结构大致如下:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/32e28b78e85b31a43c8b16dabd4804cf.png)
可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lock和unlock操作:
当ReentrantLock执行lock()时,主要是通过AbstractQueuedSynchronizer的acquire()方法实现:
下图揭示了从同步器获取锁时,内部的等待队列的状态变化图:
初始状态
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/369957462ccf30401f268b32e7906950.png)
当只有一个线程t1进行lock()操作时,由于tryAcquire()将返回true,不用进行等待,等待队列状态不变。
若在线程t1还未unlock(),线程t2就进行了lock()操作,此时等待队列将被初始化,并将线程t2插入等待队列:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/924817877e5faec0b1c3eef10c789a55.png)
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/e854032d96d8e3d952b942ba2d1f75ad.png)
此时,若线程t3也进行lock()操作:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/3632ab96ad1217d59650f76d29ed5ec3.png)
以上则是ReentrantLock的加锁(lock)机制,下面则是ReentrantLock的解锁(unlock)机制:
一旦
LockSupport.unpark(s.thread);执行完,对应的等待节点将被唤醒:
以上,则是AbstractQueuedSynchronizer同步器的基本实现机制,其作为很多并发工具的基础,规范了如何阻塞和唤醒线程,相比普通的锁机制(如synchronized),其通过自旋等待和精确唤醒,可以提高一些并发时的性能。
JDK7源码
Java并发编程第14章
http://coderbee.net/index.php/concurrent/20131115/577
http://ifeve.com/introduce-abstractqueuedsynchronizer/
http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
什么是AbstractQueuedSynchronizer(AQS)
AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState及compareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/787c951efe9549ab0392ff04e3309b67.png)
AbstractQueuedSynchronizer(AQS)实现
AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/4b16dc9c11a7e6785359eb6f4e6aa572.png)
AbstractOwnableSynchronizer
当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } /** * 当前独占同步器的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置当前独占同步器的线程 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /** * 获取当前独占同步器的线程 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node:static final class Node { /** * 标记节点正以共享模式等待 */ static final Node SHARED = new Node(); /** * 标记节点正以独占模式等待 */ static final Node EXCLUSIVE = null; // ===== 以下表示节点的等待状态 ===== /** * 表示当前的线程被取消 */ static final int CANCELLED = 1; /** * 表示当前节点的后继节点包含的线程需要运行,也就是unpark */ static final int SIGNAL = -1; /** * 表示当前节点在等待condition,也就是在condition队列中 */ static final int CONDITION = -2; /** * 示当前场景下后续的acquireShared能够得以执行 */ static final int PROPAGATE = -3; /** * 状态 */ volatile int waitStatus; /** * 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成连接。 */ volatile Node prev; /** * 后继结点 */ volatile Node next; /** * 入队列时的当前线程 */ volatile Thread thread; /** * 存储condition队列中的后继节点。 */ Node nextWaiter; ... }
其中AbstractQueuedSynchronizer维护的链表结构大致如下:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/32e28b78e85b31a43c8b16dabd4804cf.png)
ReentrantLock
可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lock和unlock操作:public class ReentrantLock implements Lock, java.io.Serializable { // 同步器,用于实现锁机制 private final Sync sync; /** * 基础的同步器实现 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 由公平锁和非公平锁实现 */ abstract void lock(); /** * 非公平锁时,尝试加锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 同步器状态 int c = getState(); if (c == 0) { // 若同步器状态为初始状态,则尝试加锁 if (compareAndSetState(0, acquires)) { // 设置锁的占用线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经加锁过,则设置state为锁的重入次数+1 int nextc = c + acquires; if (nextc < 0) // 超出了锁重入的最大次数 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 尝试释放同步器 */ protected final boolean tryRelease(int releases) { // 释放后的新状态 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) // 非占用线程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // state归零,释放成功 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /* * 当前线程是否独占该锁 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /* * 创建一个条件对象 */ final ConditionObject newCondition() { return new ConditionObject(); } /* * 获取当前独占线程 */ final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } /* * 获取锁被重入的次数 */ final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /* * 锁是否被占用 */ final boolean isLocked() { return getState() != 0; } /** * 从对象流中反序列化锁对象 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); // 重置为初始状态 setState(0); } } /** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加锁 */ final void lock() { // 先尝试直接加锁,即抢占式 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 失败后,就排队抢锁 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 直接进行排队抢锁,保持公平 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 若没有其他线程已经在等待队列中,则尝试加锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经占有锁,则重入次数 + acquires int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * 默认非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 请求锁 */ public void lock() { sync.lock(); } /** * 尝试加锁,可被中断 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试加锁 */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** * 加锁,具有超时限制 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 解锁 */ public void unlock() { sync.release(1); } /** * 创建一个条件对象 */ public Condition newCondition() { return sync.newCondition(); } /** * 获取锁的重入次数 */ public int getHoldCount() { return sync.getHoldCount(); } /** * 锁是否被当前线程持有 */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 锁是否已被持有 */ public boolean isLocked() { return sync.isLocked(); } /** * 是否是公平锁 */ public final boolean isFair() { return sync instanceof FairSync; } /** * 获取占用锁的线程 */ protected Thread getOwner() { return sync.getOwner(); } /** * 是否有等待的线程 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 判断线程是否在等待队列中 */ public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } /** * 获取等待队列长度,并发时,不是绝对精确 */ public final int getQueueLength() { return sync.getQueueLength(); } /** * 获取等待的线程集合,不是绝对精确 */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } /** * 判断是否有线程在某条件上等待 */ public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } /** * 获取在某条件上等待的线程数 */ public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } /** * 获取在某条件上等待的线程集 */ protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } }
当ReentrantLock执行lock()时,主要是通过AbstractQueuedSynchronizer的acquire()方法实现:
// ReentrantLock.lock() public void lock() { sync.lock(); } // FairSync.sync() final void lock() { acquire(1); } // 获取锁 public final void acquire(int arg) { // 尝试获取锁: // 1. 成功时,直接返回 // 2. 失败时,以独占的方式将当前线程入队addWaiter,并且等待自旋等待acquireQueued if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若等待返回了,则中断自己 selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先尝试直接入队尾, // 并发时有可能失败,则通过enq入队 Node pred = tail; if (pred != null) { // 已经有线程在等待,则尝试直接设置node为tail node.prev = pred; if (compareAndSetTail(pred, node)) { // 链接旧的tail.next -> node pred.next = node; return node; } } // 等待队列为空 或 并发时compareAndSetTail失败,则尝试继续插入等待节点 enq(node); return node; } // 新节点入队,返回旧的尾节点 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 等待队列此时为空,初始化头节点 if (compareAndSetHead(new Node())) // 初始化尾节点 tail = head; } else { // 等待队列不为空 // 链接新节点的前驱节点为尾节点 node.prev = t; // 设置新节点为尾节点 if (compareAndSetTail(t, node)) { // 链接旧尾节点的后驱节点为新节点 t.next = node; return t; } } } } // 自旋等待 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 从当前节点往前找到头节点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 已获取到锁,设置新的头节点,相当于节点出队 setHead(node); // 释放掉等待节点,头节点是没有next属性的 p.next = null; // help GC failed = false; return interrupted; } // 当请求锁失败后,检查节点是否被需要阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 若排队失败,则取消获取锁的请求 if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 节点的前驱节点状态为SIGNAL时,表示该节点已经请求过需要被唤醒,可以安全地阻塞 */ return true; if (ws > 0) { /* * 若前驱节点已被取消,则忽略这些取消的节点,继续往前查找未 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 此时前驱节点的状态为0或PROPAGATE(-3),此时需要一个唤醒节点信号,但没必要阻塞线程 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 阻塞当前线程 LockSupport.park(this); // 线程是否被中断,且重置中断状态 return Thread.interrupted(); } // 尝试取消请求 private void cancelAcquire(Node node) { // 忽略不存在的节点 if (node == null) return; // 节点线程置空 node.thread = null; // 忽略取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext节点,表示node节点前的第一个非取消状态节点的后继节点 Node predNext = pred.next; // 将节点状态设置为取消 node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,设置新的尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 将node的后继节点置空 compareAndSetNext(pred, predNext, null); } else { // 若node不为尾节点,即为链表中间的节点 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //如果node的前驱节点不是头节点,那么需要给当前节点的后继节点一个"等待唤醒"的标记, //即将当前节点的前驱节点等待状态设置为SIGNAL,然后将其设置为当前节点的后继节点的前驱节点 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 唤醒node节点的后继节点。 unparkSuccessor(node); } // 让取消节点的next引用会指向自己 node.next = node; } }
下图揭示了从同步器获取锁时,内部的等待队列的状态变化图:
初始状态
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/369957462ccf30401f268b32e7906950.png)
当只有一个线程t1进行lock()操作时,由于tryAcquire()将返回true,不用进行等待,等待队列状态不变。
若在线程t1还未unlock(),线程t2就进行了lock()操作,此时等待队列将被初始化,并将线程t2插入等待队列:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/924817877e5faec0b1c3eef10c789a55.png)
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/e854032d96d8e3d952b942ba2d1f75ad.png)
此时,若线程t3也进行lock()操作:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/30/3632ab96ad1217d59650f76d29ed5ec3.png)
以上则是ReentrantLock的加锁(lock)机制,下面则是ReentrantLock的解锁(unlock)机制:
// ReentrantLock.unlock() public void unlock() { sync.release(1); } // Sync.release() public final boolean release(int arg) { // 尝试解锁,由子类ReentrantLock区定义 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒第一个节点 unparkSuccessor(h); return true; } return false; } // ReentrantLock.tryRelease() protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 仅当state = 0时,才算释放锁成功,即统一线程的lock()次数必须与unlock()次数相同 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // AbstractQueuedSynchronizer.unparkSuccessor() private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 寻找有效的后继节点 Node s = node.next; // 后继节点不存在,或状态为取消时,则查询最前面的一个非取消的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒对应节点锁在线程 LockSupport.unpark(s.thread); }
一旦
LockSupport.unpark(s.thread);执行完,对应的等待节点将被唤醒:
private final boolean parkAndCheckInterrupt() { // 唤醒后返回 LockSupport.park(this); return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { ... try { ... for (;;) { // 设置新的head节点,等待节点得到执行 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } } } finally { ... } }
以上,则是AbstractQueuedSynchronizer同步器的基本实现机制,其作为很多并发工具的基础,规范了如何阻塞和唤醒线程,相比普通的锁机制(如synchronized),其通过自旋等待和精确唤醒,可以提高一些并发时的性能。
参考文献
JDK7源码Java并发编程第14章
http://coderbee.net/index.php/concurrent/20131115/577
http://ifeve.com/introduce-abstractqueuedsynchronizer/
http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序