您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer(AQS)源码解析

2018-02-02 17:30 615 查看
首先,我想先推荐看这篇博文AbstractQueuedSynchronizer(AQS)源码解析,我认为写得非常棒。

以下是我自己的一些理解。

前提

了解AbstractQueuedSynchronizer(AQS)之前,我们有必要了解下CAS(compare and swap)和CLH(Craig, Landin, and Hagersten)队列锁,这两个概念。
CAS,简单来讲,就是将比较与交换这两个操作,通过机器指令,形成一个原子操作,整个concurrent包都基于CAS,可以说,没有CAS,就没有concurrent包。更详细的内容:Java中CAS详解
CLH队列锁,简单来讲,就是一个队列,里面的每个节点都是一个线程,一个线程想要获得锁,只依赖于他的前置线程的获得锁的情况。更详细的内容:JAVA并发编程学习笔记之CLH队列锁。AQS中的同步队列基于它并做了一些修改,是一个包含一个head,一个tail节点的双向队列。

1. AQS是什么?

1.1 简介

AQS是由Doug Lea编写的一个基于FIFO队列,用来构建同步锁、同步组件的基础框架,java.util.concurrent包中很多并发工具类都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch。

1.2 用途

AQS能够用来简化并发工具的内部实现,它屏蔽了并发工具的同步状态管理,同步队列的维护,线程的阻塞和唤醒。它运用了模板设计模式,因此并发工具的开发者只需要继承AQS并实现特定的方法,将其组合到并发工具中,调用模板方法,来实现并发工具。

1.3 特性

AQS提供了两种模式,独占模式和共享模式。
       独占模式:锁只能由一个线程获取
       共享模式:锁可以由多个线程获取

个人认为,在AQS中,锁的语义,即state变量,对锁的操作,即是对state变量的操作。那么在多线程中,就要保证state变量的原子一致性。AQS为子类提供了3个方法,保证state变量的原子一致性。

 protected final int getState();
 protected final void setState(int newState);

 protected final boolean compareAndSetState(int expect, int update);

2. 实现的简要说明

因为下面的步骤说明中引入了队列中node的waitstatus,因此首先我们来解释下node的waitstatus,即每个节点的等待状态

描述
CANCELLED (1)当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止。
SIGNAL (-1)当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的。
CONDITION (-2)当前线程在condition队列中。
PROPAGATE (-3)用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的
0表示无状态。

2.1 如何获取锁

AQS中维护了一个FIFO同步队列,当一个线程想要获取锁时,
步骤:
1.尝试直接获取锁,如果失败,将自己封装在Node中,放入同步队列队尾

2.在同步队列中的线程,获取锁的步骤
   2.1 如果该节点是head的next节点,那么会去尝试再获取一次锁,
  2.1.1如果失败,将自己阻塞,同时将前置节点(即head节点,head节点不可能为cancel状态)的waitstatus设置为signal(需要唤醒下一个节点的状态),直到被前置节点唤醒,唤醒之后继续尝试获取
  2.1.2 如果成功,将head出队,自己晋升为head(head节点个人认为可以理解为拥有锁的节点),此时如果是共享模式下,它还会去唤醒后继节点,让其去尝试获取锁
   2.2 如果该节点不是head的next节点,那么直接阻塞,同时将前置节点(如果前置节点为CANCEL状态,则将前置节点替换为上一个不为CANCEL的节点)的waitstatus设置为signal(需要唤醒下一个节点),
直到被前置节点唤醒,唤醒之后继续尝试获取

Q:线程何时被唤醒?
A: 1. 共享模式下,一个线程节点获取到锁,会唤醒它的直接后继线程。
    2. 当队列中持有锁的节点释放锁的时候,会唤醒它的直接后继线程。

2.2 如何释放锁

步骤:
1. 修改同步状态,让后继线程能够获取到锁
2. 唤醒后继线程,同时将当前释放锁的这个节点的状态,改为0(无状态)

其实,AQS主要就做了3件事,
1. 管理同步状态
2. 管理同步队列
3. 管理线程的阻塞和唤醒

3. 代码解析

static final class Node {

/**
* 用于标记一个节点在共享模式下等待
*/
static final Node SHARED = new Node();

/**
* 用于标记一个节点在独占模式下等待
*/
static final Node EXCLUSIVE = null;

/**
* 等待状态:取消
*/
static final int CANCELLED = 1;

/**
* 等待状态:通知
*/
static final int SIGNAL = -1;

/**
* 等待状态:条件等待
*/
static final int CONDITION = -2;

/**
* 等待状态:传播
*/
static final int PROPAGATE = -3;

/**
* 等待状态
*/
volatile int waitStatus;

/**
* 前驱节点
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 节点对应的线程
*/
volatile Thread thread;

/**
* 等待队列中的后继节点
*/
Node nextWaiter;

/**
* 当前节点是否处于共享模式等待
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/*
4000
*
* 获取前驱节点,如果为空的话抛出空指针异常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}

Node() {
}

/**
* addWaiter会调用此构造函数
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

/**
* Condition会用到此构造函数
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

这里有必要先说一下PROPAGATE这个状态。当我们搜索代码时会发现,只有一个地方设置了这个状态
/**
* 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。
* 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。
* 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。
*/
private void doReleaseShared() {
/*
* 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程;
* 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下,
* 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。
* 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
*/
for (;;) {
Node h = head;
// 如果队列中存在后继线程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 检查h是否仍然是head,如果不是的话需要再进行循环。
if (h == head)
break;
}
}


加入这个PROPAGATE状态主要是为了解决共享模式下,多个线程同时释放锁的时候,引起的一个BUG。以Semaphore为例,

我们假设某次循环中队列里排队的节点为情况为:

head -> t1的node -> t2的node(也就是tail)

信号量释放的顺序为t3先释放,t4后释放:

时刻1: t3调用releaseShared,调用了unparkSuccessor(h),head的等待状态从-1变为0

时刻2: t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0

时刻3: t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),不满足条件,因此不会调用unparkSuccessor(h)

时刻4: t1获取信号量成功,调用setHeadAndPropagate时,因为不满足propagate > 0(时刻2的返回值也就是propagate==0),从而不会唤醒后继节点。

那么引入PROPAGATE是怎么解决问题的呢?

引入之后,调用releaseShared方法不再简单粗暴地直接unparkSuccessor,而是将传播行为抽了一个doReleaseShared方法出来。

再看上面的那种情况:

时刻1:t3调用releaseShared -> doReleaseShared -> unparkSuccessor,完了之后head的等待状态为0

时刻2:t1由于t3释放了信号量,被t3唤醒,调用Semaphore.NonfairSync的tryAcquireShared,返回值为0

时刻3:t4调用releaseShared,读到此时h.waitStatus为0(此时读到的head和时刻1中为同一个head),将等待状态置为PROPAGATE

时刻4:t1获取信号量成功,调用setHeadAndPropagate时,可以读到h.waitStatus < 0,从而可以接下来调用doReleaseShared唤醒t2

也就是说,上面会产生线程hang住bug的case在引入PROPAGATE后可以被规避掉。在PROPAGATE引入之前,之所以可能会出现线程hang住的情况,就是在于

releaseShared有竞争的情况下,可能会有队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁,但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒。

未完待续。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: