您的位置:首页 > 编程语言 > Java开发

Java多线程之Condition实现原理和源码分析(四)

2018-11-09 16:56 781 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014730165/article/details/83861120

章节概览、

1、概述

上面的几个章节我们基于lock(),unlock()方法为入口,深入分析了独占锁的获取和释放。这个章节我们在此基础上,进一步分析AQS是如何实现await,signal功能。其功能上和synchronize的wait,notify一样。但是本质是也是有很多区别。

1.1、 Condition和synchronize各自实现的等待唤醒区别
  1. Condition是基于AQS 队列同步器实现的。而synchronize的等待唤醒是基于jvm的语义层次上实现的。
  2. Condition的使用不管是await,signal都有维护各自的一个阻塞队列。而在synchronize所有的阻塞线程都被放到同一个阻塞队列里面,所以在多线程的情况下,可能存在早唤醒,唤醒失败等情况。

2、入门案例

入门案例采用的是大家比较熟悉的生产者和消费者。

public class ProducerConsumerTest {

private Lock lock = new ReentrantLock();

private Condition addCondition = lock.newCondition();

private Condition removeCondition = lock.newCondition();

private LinkedList<Integer> resources = new LinkedList<>();

private int maxSize;

public ProducerConsumerTest(int maxSize) {
this.maxSize = maxSize;
}

public class Producer implements Runnable {

private int proSize;

private Producer(int proSize) {
this.proSize = proSize;
}

@Override
public void run() {
lock.lock();
try {
for (int i = 1; i < proSize; i++) {
while (resources.size() >= maxSize) {
System.out.println("当前仓库已满,等待消费...");
try {
addCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("已经生产产品数: " + i + "\t现仓储量总量:" + resources.size());
resources.add(i);
removeCondition.signal();
}
} finally {
lock.unlock();
}
}
}

public class Consumer implements Runnable {

@Override
public void run() {
String threadName = Thread.currentThread().getName();
while (true) {
lock.lock();
try {
while (resources.size() <= 0) {
System.out.println(threadName + " 当前仓库没有产品,请稍等...");
try {
// 进入阻塞状态
removeCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费数据
int size = resources.size();
for (int i = 0; i < size; i++) {
Integer remove = resources.remove();
System.out.println(threadName + " 当前消费产品编号为:" + remove);
}
// 唤醒生产者
addCondition.signal();
} finally {
lock.unlock();
}
}

}
}

public static void main(String[] args) throws InterruptedException {
ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10);
Producer producer = producerConsumerTest.new Producer(100);
Consumer consumer = producerConsumerTest.new Consumer();
final Thread producerThread = new Thread(producer, "producer");
final Thread consumerThread = new Thread(consumer, "consumer");
producerThread.start();
TimeUnit.SECONDS.sleep(2);
consumerThread.start();
}
}

其中维护了一个存储仓库。当生产者把生产的物品放入到仓库中,直到仓库填满,进行阻塞等待,此时生产者会释放当前的锁。刚开始,消费者会检查当前仓库是否有物品,如果没有物品进行阻塞,等待唤醒。当生产者唤醒消费者,消费者进行消费。我们以此案例的进行逐步分析。

3、 Condition源码分析

3.1、 Condition接口源码分析
public interface Condition {
// 当前线程进入等待状态直到被通知(signal)或者中断
// 当前线程进入运行状态并从await()方法返回的场景包括:
//(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;
//(2)其他线程调用interrupt方法中断当前线程;
void await() throws InterruptedException;

// 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
void awaitUninterruptibly();

// 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了
boolean await(long time, TimeUnit unit) throws InterruptedException;

// 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了
long awaitNanos(long nanosTimeout) throws InterruptedException;

//当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false
boolean awaitUntil(Date deadline) throws InterruptedException;

// 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
void signal();

// 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁
void signalAll();
}
3.2、Condition 实现类成员变量 构造函数说明

Condition的实现类为:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject。从类的引用方式可以看出,其是AbstractQueuedSynchronizer的一个内部类。

public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** Condition Queue 里面的头节点 */
private transient Node firstWaiter;
/** Condition Queue 里面的尾节点 */
private transient Node lastWaiter;

/** 构造函数 */
public ConditionObject() { }
// 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
// REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt再次中断最后会调用 selfInterrupt)
// THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;

}

从源码分析可以看出,每个Condition实例里面都会维护着一个链表。通过 firstWaiter,lastWaiter进行存储。

4、wait()方法核心源码分析

4.1、 await()
public final void await() throws InterruptedException {
// 判断当前线程是否被阻塞,如果被阻塞,直接抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到Condition等待队列中
// 详情请看: 4.2 小节addConditionWaiter()源码分析
Node node = addConditionWaiter();
// 释放当前拥有的锁资源:4.3小节fullyRelease(Node node)源码分析
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在队列同步器中。如果在同步器中,阻塞当前线程
// 死循环,直到其他线程唤醒当前线程
// 具体判断是否在同步队列中,请参考4.4 小节:boolean isOnSyncQueue(Node node)
while (!isOnSyncQueue(node)) {
// 阻塞当前线程,直到其他线程对其唤醒
LockSupport.park(this);
// 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的
// 中断检测源码请请4.5小节:int checkInterruptWhileWaiting(Node node)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前lnode节点的nextWaiter != null,则清理当前队列
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根据中断模式返回异常情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
4.2 、 addConditionWaiter()

当前方法位置:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter+

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果lastWaiter不为空,且其状态不等于 Node.CONDITION。将当前节点删除
if (t != null && t.waitStatus != Node.CONDITION) {
// 删除当前节点,详情查看:4.2.1小结unlinkCancelledWaiters()分析
unlinkCancelledWaiters();
// 重新复制当前node节点t节点
t = lastWaiter;
}
// 创建一个node节点,设置状态为:Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果lastWaiter()为空,设置 firstWaiter = node
// 如果lastWaiter不为空,直接加入t.nextWaiter = node
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 设置当前节点为lastWaiter节点
lastWaiter = node;
return node;
}
4.2.1、unlinkCancelledWaiters()

清理Condition队列中被中断或者过期的节点
trail 节点一直维护者一个符合Condition条件队列的节点。如果当前符合要求,直接尾部追加到trail中。如果当前节点不合法,直接跳过

private void unlinkCancelledWaiters() {
// 赋值当前头节点
Node t = firstWaiter;
// trail 是中间存储变量
Node trail = null;
// 循环遍历,直到 t == null 退出
while (t != null) {
Node next = t.nextWaiter;
// 遍历当前不符合Condition队列规则的,将其清理
if (t.waitStatus != Node.CONDITION) {
// 设置当前 t节点的nextWaiter = null,方便 gc
t.nextWaiter = null;
// 如果 trail 为空。说明当前不合法的节点是firstWaiter,重新对其进行赋值
if (trail == null)
firstWaiter = next;
else
// 如果当前节点不合法,则将当前节点的next 赋值给 trail.nextWaiter 节点
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 当前节点赋值给t
trail = t;
// t赋值为下一个节点
t = next;
}
}
4.3、 fullyRelease(Node node)
完全释放当前节点的锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取系统中当前的state状态(可冲入锁,其状态可能不为1)
int savedState = getState();
// 释放当前节点的锁,具体细节请参考上一章节
if (release(savedState)) {
// 释放成功,重置failed的值
failed = false;
return savedState;
} else {
// 释放失败,抛出异常
throw new IllegalMonitorStateException();
}
} finally {
// 如果释放失败,将当前的node节点设置为删除状态
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
4.4、boolean isOnSyncQueue(Node node)
final boolean isOnSyncQueue(Node node) {
// 判断node节点的状态是否为:Node.CONDITION 或者 其前驱节点为 null,直接返回为 false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果其后驱节点不为null,直接返回为true。退出当前循环
if (node.next != null) // If has successor, it must be on queue
return true;

/* node.prev 可以是非null,但尚未在队列中。因为
*将其置于队列中的CAS可能会失败。 所以我们必须这样做
*从尾部遍历,以确保它实际上成功。 它
*在调用此方法时,它总是接近尾部,并且
*除非CAS失败。
*/
// 详情请查看4.4.1 小结:findNodeFromTail(Node node)
return findNodeFromTail(node);
}
4.4.1、findNodeFromTail(Node node)
从队列尾部进行遍历,查看node节点是否存在于队列中
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
4.5、int checkInterruptWhileWaiting(Node node)
/**
* 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的
* 若是中断引起的, 则将 Node 从 Condition Queue 转移到 Sync Queue 里面
* 返回值的区别:
*      0: 当前线程没有被中断,此次唤醒是通过 signal
*      THROW_IE: 此次的唤醒是通过 interrupt, 并且在接受 signal 之前
*      REINTERRUPT: 线程的唤醒是 接受过 signal 而后又被中断
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 将 Node 从 Condition Queue 转移到 Sync Queue 里面
// 详细源码分析参考4.5.1 小节:transferAfterCancelledWait(Node node)
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
4.5.1、 transferAfterCancelledWait(Node node)
final boolean transferAfterCancelledWait(Node node) {
// 通过cas修改当前节点的状态为0,独占锁默认状态
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
添加到 Syn的队列同步器中
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq().  Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

至此wait()对应的源码已经分析完成。主要存在以下几个核心步骤:

  1. 将当前节点添加到Condition 所维护的队列中,尾部进行追加;
  2. 释放当前线程锁持有的锁;
  3. 循环判断当前线程是否在Sync的队列同步器中;
  4. 如果存在于队列同步器中,加下来的线程交给AQS自己去处理了;

5、signal() 核心源码分析

5.1、signal() 入口方法

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

public final void signal() {
// 判断当前线程是否拥有锁资源
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 唤醒头结点
doSignal(first);
}
5.2 、void doSignal(Node first)

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将当前节点添加到Sync的队列中,通过unlock()方法的调用,进行唤醒
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
5.3、 boolean transferForSignal(Node node)
final boolean transferForSignal(Node node) {
// 设置当前节点的状态为0,通过cas,设置失败直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将当前的节点添加到Sync的队列同步器中,同时返回当前节点的前一个节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果当前节点的ws > 0 或者设置当前节点的状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒当前线程
LockSupport.unpark(node.thread);
return true;
}

至此signal 源码已经分析完成,signal方法主要是将当前的Condition的first节点,转移到Sync的FIFO中。等待唤醒操作。

6、总结

至此,AQS的核心知识点源码分析完成,本人能力有限,如果不妥的地方,欢迎指正。

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: