您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer框架

2016-07-25 22:00 585 查看
常开发中,大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLock,Semaphore,Java线程池中的Worker等。本文将介绍AQS相关的实现细节。


什么是AbstractQueuedSynchronizer(AQS)

AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState及compareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:




AbstractQueuedSynchronizer(AQS)实现

AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:




AbstractOwnableSynchronizer

当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

private static final long serialVersionUID = 3737899427754241961L;

protected AbstractOwnableSynchronizer() { }

/**
* 当前独占同步器的线程
*/
private transient Thread exclusiveOwnerThread;

/**
* 设置当前独占同步器的线程
*/
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}

/**
* 获取当前独占同步器的线程
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}



AbstractQueuedSynchronizer

AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node:
static final class Node {

/**
* 标记节点正以共享模式等待
*/
static final Node SHARED = new Node();

/**
* 标记节点正以独占模式等待
*/
static final Node EXCLUSIVE = null;

// ===== 以下表示节点的等待状态 =====

/**
* 表示当前的线程被取消
*/
static final int CANCELLED =  1;

/**
* 表示当前节点的后继节点包含的线程需要运行,也就是unpark
*/
static final int SIGNAL    = -1;

/**
* 表示当前节点在等待condition,也就是在condition队列中
*/
static final int CONDITION = -2;

/**
* 示当前场景下后续的acquireShared能够得以执行
*/
static final int PROPAGATE = -3;

/**
* 状态
*/
volatile int waitStatus;

/**
* 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成连接。
*/
volatile Node prev;

/**
* 后继结点
*/
volatile Node next;

/**
* 入队列时的当前线程
*/
volatile Thread thread;

/**
* 存储condition队列中的后继节点。
*/
Node nextWaiter;

...
}


其中AbstractQueuedSynchronizer维护的链表结构大致如下:




ReentrantLock

可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lock和unlock操作:
public class ReentrantLock implements Lock, java.io.Serializable {

// 同步器,用于实现锁机制
private final Sync sync;

/**
* 基础的同步器实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* 由公平锁和非公平锁实现
*/
abstract void lock();

/**
* 非公平锁时,尝试加锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 同步器状态
int c = getState();
if (c == 0) {
// 若同步器状态为初始状态,则尝试加锁
if (compareAndSetState(0, acquires)) {
// 设置锁的占用线程
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 当前线程已经加锁过,则设置state为锁的重入次数+1
int nextc = c + acquires;
if (nextc < 0) // 超出了锁重入的最大次数
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

/**
* 尝试释放同步器
*/
protected final boolean tryRelease(int releases) {
// 释放后的新状态
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 非占用线程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// state归零,释放成功
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

/*
* 当前线程是否独占该锁
*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

/*
* 创建一个条件对象
*/
final ConditionObject newCondition() {
return new ConditionObject();
}

/*
* 获取当前独占线程
*/
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

/*
* 获取锁被重入的次数
*/
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

/*
* 锁是否被占用
*/
final boolean isLocked() {
return getState() != 0;
}

/**
* 从对象流中反序列化锁对象
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
// 重置为初始状态
setState(0);
}
}

/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* 加锁
*/
final void lock() {
// 先尝试直接加锁,即抢占式
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 失败后,就排队抢锁
acquire(1);
}

/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* 公平锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
// 直接进行排队抢锁,保持公平
acquire(1);
}

/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 若没有其他线程已经在等待队列中,则尝试加锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 当前线程已经占有锁,则重入次数 + acquires
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

/**
* 默认非公平锁
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* 请求锁
*/
public void lock() {
sync.lock();
}

/**
* 尝试加锁,可被中断
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

/**
* 尝试加锁
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

/**
* 加锁,具有超时限制
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

/**
* 解锁
*/
public void unlock() {
sync.release(1);
}

/**
* 创建一个条件对象
*/
public Condition newCondition() {
return sync.newCondition();
}

/**
* 获取锁的重入次数
*/
public int getHoldCount() {
return sync.getHoldCount();
}

/**
* 锁是否被当前线程持有
*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

/**
* 锁是否已被持有
*/
public boolean isLocked() {
return sync.isLocked();
}

/**
* 是否是公平锁
*/
public final boolean isFair() {
return sync instanceof FairSync;
}

/**
* 获取占用锁的线程
*/
protected Thread getOwner() {
return sync.getOwner();
}

/**
* 是否有等待的线程
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

/**
* 判断线程是否在等待队列中
*/
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}

/**
* 获取等待队列长度,并发时,不是绝对精确
*/
public final int getQueueLength() {
return sync.getQueueLength();
}

/**
* 获取等待的线程集合,不是绝对精确
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

/**
* 判断是否有线程在某条件上等待
*/
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

/**
* 获取在某条件上等待的线程数
*/
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

/**
* 获取在某条件上等待的线程集
*/
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
}

当ReentrantLock执行lock()时,主要是通过AbstractQueuedSynchronizer的acquire()方法实现:
// ReentrantLock.lock()
public void lock() {
sync.lock();
}

// FairSync.sync()
final void lock() {
acquire(1);
}

// 获取锁
public final void acquire(int arg) {
// 尝试获取锁:
// 1. 成功时,直接返回
// 2. 失败时,以独占的方式将当前线程入队addWaiter,并且等待自旋等待acquireQueued
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 若等待返回了,则中断自己
selfInterrupt();
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试直接入队尾,
// 并发时有可能失败,则通过enq入队
Node pred = tail;
if (pred != null) {
// 已经有线程在等待,则尝试直接设置node为tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 链接旧的tail.next -> node
pred.next = node;
return node;
}
}
// 等待队列为空 或 并发时compareAndSetTail失败,则尝试继续插入等待节点
enq(node);
return node;
}

// 新节点入队,返回旧的尾节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 等待队列此时为空,初始化头节点
if (compareAndSetHead(new Node()))
// 初始化尾节点
tail = head;
} else {
// 等待队列不为空
// 链接新节点的前驱节点为尾节点
node.prev = t;
// 设置新节点为尾节点
if (compareAndSetTail(t, node)) {
// 链接旧尾节点的后驱节点为新节点
t.next = node;
return t;
}
}
}
}

// 自旋等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 从当前节点往前找到头节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 已获取到锁,设置新的头节点,相当于节点出队
setHead(node);
// 释放掉等待节点,头节点是没有next属性的
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当请求锁失败后,检查节点是否被需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 若排队失败,则取消获取锁的请求
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 节点的前驱节点状态为SIGNAL时,表示该节点已经请求过需要被唤醒,可以安全地阻塞
*/
return true;
if (ws > 0) {
/*
* 若前驱节点已被取消,则忽略这些取消的节点,继续往前查找未
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 此时前驱节点的状态为0或PROPAGATE(-3),此时需要一个唤醒节点信号,但没必要阻塞线程
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 线程是否被中断,且重置中断状态
return Thread.interrupted();
}

// 尝试取消请求
private void cancelAcquire(Node node) {
// 忽略不存在的节点
if (node == null)
return;
// 节点线程置空
node.thread = null;

// 忽略取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext节点,表示node节点前的第一个非取消状态节点的后继节点
Node predNext = pred.next;

// 将节点状态设置为取消
node.waitStatus = Node.CANCELLED;

// 如果当前节点是尾节点,设置新的尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将node的后继节点置空
compareAndSetNext(pred, predNext, null);
} else {
// 若node不为尾节点,即为链表中间的节点
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如果node的前驱节点不是头节点,那么需要给当前节点的后继节点一个"等待唤醒"的标记,
//即将当前节点的前驱节点等待状态设置为SIGNAL,然后将其设置为当前节点的后继节点的前驱节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒node节点的后继节点。
unparkSuccessor(node);
}
// 让取消节点的next引用会指向自己
node.next = node;
}
}


下图揭示了从同步器获取锁时,内部的等待队列的状态变化图:

初始状态



当只有一个线程t1进行lock()操作时,由于tryAcquire()将返回true,不用进行等待,等待队列状态不变。

若在线程t1还未unlock(),线程t2就进行了lock()操作,此时等待队列将被初始化,并将线程t2插入等待队列:


 


此时,若线程t3也进行lock()操作:



以上则是ReentrantLock的加锁(lock)机制,下面则是ReentrantLock的解锁(unlock)机制:
// ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}

// Sync.release()
public final boolean release(int arg) {
// 尝试解锁,由子类ReentrantLock区定义
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒第一个节点
unparkSuccessor(h);
return true;
}
return false;
}

// ReentrantLock.tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 仅当state = 0时,才算释放锁成功,即统一线程的lock()次数必须与unlock()次数相同
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

// AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 寻找有效的后继节点

Node s = node.next;
// 后继节点不存在,或状态为取消时,则查询最前面的一个非取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒对应节点锁在线程
LockSupport.unpark(s.thread);
}


一旦
LockSupport.unpark(s.thread);执行完,对应的等待节点将被唤醒:
private final boolean parkAndCheckInterrupt() {
// 唤醒后返回
LockSupport.park(this);
return Thread.interrupted();
}

final boolean acquireQueued(final Node node, int arg) {
...
try {
...
for (;;) {
// 设置新的head节点,等待节点得到执行
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}

}
} finally {
...
}
}


以上,则是AbstractQueuedSynchronizer同步器的基本实现机制,其作为很多并发工具的基础,规范了如何阻塞和唤醒线程,相比普通的锁机制(如synchronized),其通过自旋等待和精确唤醒,可以提高一些并发时的性能。


参考文献

JDK7源码

Java并发编程第14章

http://coderbee.net/index.php/concurrent/20131115/577

http://ifeve.com/introduce-abstractqueuedsynchronizer/

http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 线程