Java多线程之Condition实现原理和源码分析(四)
章节概览、
1、概述
上面的几个章节我们基于lock(),unlock()方法为入口,深入分析了独占锁的获取和释放。这个章节我们在此基础上,进一步分析AQS是如何实现await,signal功能。其功能上和synchronize的wait,notify一样。但是本质是也是有很多区别。
1.1、 Condition和synchronize各自实现的等待唤醒区别
- Condition是基于AQS 队列同步器实现的。而synchronize的等待唤醒是基于jvm的语义层次上实现的。
- Condition的使用不管是await,signal都有维护各自的一个阻塞队列。而在synchronize所有的阻塞线程都被放到同一个阻塞队列里面,所以在多线程的情况下,可能存在早唤醒,唤醒失败等情况。
2、入门案例
入门案例采用的是大家比较熟悉的生产者和消费者。
public class ProducerConsumerTest { private Lock lock = new ReentrantLock(); private Condition addCondition = lock.newCondition(); private Condition removeCondition = lock.newCondition(); private LinkedList<Integer> resources = new LinkedList<>(); private int maxSize; public ProducerConsumerTest(int maxSize) { this.maxSize = maxSize; } public class Producer implements Runnable { private int proSize; private Producer(int proSize) { this.proSize = proSize; } @Override public void run() { lock.lock(); try { for (int i = 1; i < proSize; i++) { while (resources.size() >= maxSize) { System.out.println("当前仓库已满,等待消费..."); try { addCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("已经生产产品数: " + i + "\t现仓储量总量:" + resources.size()); resources.add(i); removeCondition.signal(); } } finally { lock.unlock(); } } } public class Consumer implements Runnable { @Override public void run() { String threadName = Thread.currentThread().getName(); while (true) { lock.lock(); try { while (resources.size() <= 0) { System.out.println(threadName + " 当前仓库没有产品,请稍等..."); try { // 进入阻塞状态 removeCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消费数据 int size = resources.size(); for (int i = 0; i < size; i++) { Integer remove = resources.remove(); System.out.println(threadName + " 当前消费产品编号为:" + remove); } // 唤醒生产者 addCondition.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) throws InterruptedException { ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10); Producer producer = producerConsumerTest.new Producer(100); Consumer consumer = producerConsumerTest.new Consumer(); final Thread producerThread = new Thread(producer, "producer"); final Thread consumerThread = new Thread(consumer, "consumer"); producerThread.start(); TimeUnit.SECONDS.sleep(2); consumerThread.start(); } }
其中维护了一个存储仓库。当生产者把生产的物品放入到仓库中,直到仓库填满,进行阻塞等待,此时生产者会释放当前的锁。刚开始,消费者会检查当前仓库是否有物品,如果没有物品进行阻塞,等待唤醒。当生产者唤醒消费者,消费者进行消费。我们以此案例的进行逐步分析。
3、 Condition源码分析
3.1、 Condition接口源码分析
public interface Condition { // 当前线程进入等待状态直到被通知(signal)或者中断 // 当前线程进入运行状态并从await()方法返回的场景包括: //(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒; //(2)其他线程调用interrupt方法中断当前线程; void await() throws InterruptedException; // 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程 void awaitUninterruptibly(); // 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了 boolean await(long time, TimeUnit unit) throws InterruptedException; // 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了 long awaitNanos(long nanosTimeout) throws InterruptedException; //当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false boolean awaitUntil(Date deadline) throws InterruptedException; // 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁 void signal(); // 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁 void signalAll(); }
3.2、Condition 实现类成员变量 构造函数说明
Condition的实现类为:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject。从类的引用方式可以看出,其是AbstractQueuedSynchronizer的一个内部类。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** Condition Queue 里面的头节点 */ private transient Node firstWaiter; /** Condition Queue 里面的尾节点 */ private transient Node lastWaiter; /** 构造函数 */ public ConditionObject() { } // 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过 // REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt再次中断最后会调用 selfInterrupt) // THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception) private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; }
从源码分析可以看出,每个Condition实例里面都会维护着一个链表。通过 firstWaiter,lastWaiter进行存储。
4、wait()方法核心源码分析
4.1、 await()
public final void await() throws InterruptedException { // 判断当前线程是否被阻塞,如果被阻塞,直接抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程添加到Condition等待队列中 // 详情请看: 4.2 小节addConditionWaiter()源码分析 Node node = addConditionWaiter(); // 释放当前拥有的锁资源:4.3小节fullyRelease(Node node)源码分析 int savedState = fullyRelease(node); int interruptMode = 0; // 判断当前节点是否在队列同步器中。如果在同步器中,阻塞当前线程 // 死循环,直到其他线程唤醒当前线程 // 具体判断是否在同步队列中,请参考4.4 小节:boolean isOnSyncQueue(Node node) while (!isOnSyncQueue(node)) { // 阻塞当前线程,直到其他线程对其唤醒 LockSupport.park(this); // 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的 // 中断检测源码请请4.5小节:int checkInterruptWhileWaiting(Node node) if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果当前lnode节点的nextWaiter != null,则清理当前队列 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 根据中断模式返回异常情况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
4.2 、 addConditionWaiter()
当前方法位置:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter+
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 如果lastWaiter不为空,且其状态不等于 Node.CONDITION。将当前节点删除 if (t != null && t.waitStatus != Node.CONDITION) { // 删除当前节点,详情查看:4.2.1小结unlinkCancelledWaiters()分析 unlinkCancelledWaiters(); // 重新复制当前node节点t节点 t = lastWaiter; } // 创建一个node节点,设置状态为:Node.CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果lastWaiter()为空,设置 firstWaiter = node // 如果lastWaiter不为空,直接加入t.nextWaiter = node if (t == null) firstWaiter = node; else t.nextWaiter = node; // 设置当前节点为lastWaiter节点 lastWaiter = node; return node; }4.2.1、unlinkCancelledWaiters()
清理Condition队列中被中断或者过期的节点
trail 节点一直维护者一个符合Condition条件队列的节点。如果当前符合要求,直接尾部追加到trail中。如果当前节点不合法,直接跳过
private void unlinkCancelledWaiters() { // 赋值当前头节点 Node t = firstWaiter; // trail 是中间存储变量 Node trail = null; // 循环遍历,直到 t == null 退出 while (t != null) { Node next = t.nextWaiter; // 遍历当前不符合Condition队列规则的,将其清理 if (t.waitStatus != Node.CONDITION) { // 设置当前 t节点的nextWaiter = null,方便 gc t.nextWaiter = null; // 如果 trail 为空。说明当前不合法的节点是firstWaiter,重新对其进行赋值 if (trail == null) firstWaiter = next; else // 如果当前节点不合法,则将当前节点的next 赋值给 trail.nextWaiter 节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else // 当前节点赋值给t trail = t; // t赋值为下一个节点 t = next; } }
4.3、 fullyRelease(Node node)
完全释放当前节点的锁 final int fullyRelease(Node node) { boolean failed = true; try { // 获取系统中当前的state状态(可冲入锁,其状态可能不为1) int savedState = getState(); // 释放当前节点的锁,具体细节请参考上一章节 if (release(savedState)) { // 释放成功,重置failed的值 failed = false; return savedState; } else { // 释放失败,抛出异常 throw new IllegalMonitorStateException(); } } finally { // 如果释放失败,将当前的node节点设置为删除状态 if (failed) node.waitStatus = Node.CANCELLED; } }
4.4、boolean isOnSyncQueue(Node node)
final boolean isOnSyncQueue(Node node) { // 判断node节点的状态是否为:Node.CONDITION 或者 其前驱节点为 null,直接返回为 false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果其后驱节点不为null,直接返回为true。退出当前循环 if (node.next != null) // If has successor, it must be on queue return true; /* node.prev 可以是非null,但尚未在队列中。因为 *将其置于队列中的CAS可能会失败。 所以我们必须这样做 *从尾部遍历,以确保它实际上成功。 它 *在调用此方法时,它总是接近尾部,并且 *除非CAS失败。 */ // 详情请查看4.4.1 小结:findNodeFromTail(Node node) return findNodeFromTail(node); }4.4.1、findNodeFromTail(Node node)
从队列尾部进行遍历,查看node节点是否存在于队列中 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
4.5、int checkInterruptWhileWaiting(Node node)
/** * 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的 * 若是中断引起的, 则将 Node 从 Condition Queue 转移到 Sync Queue 里面 * 返回值的区别: * 0: 当前线程没有被中断,此次唤醒是通过 signal * THROW_IE: 此次的唤醒是通过 interrupt, 并且在接受 signal 之前 * REINTERRUPT: 线程的唤醒是 接受过 signal 而后又被中断 */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? // 将 Node 从 Condition Queue 转移到 Sync Queue 里面 // 详细源码分析参考4.5.1 小节:transferAfterCancelledWait(Node node) (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }4.5.1、 transferAfterCancelledWait(Node node)
final boolean transferAfterCancelledWait(Node node) { // 通过cas修改当前节点的状态为0,独占锁默认状态 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 添加到 Syn的队列同步器中 enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
至此wait()对应的源码已经分析完成。主要存在以下几个核心步骤:
- 将当前节点添加到Condition 所维护的队列中,尾部进行追加;
- 释放当前线程锁持有的锁;
- 循环判断当前线程是否在Sync的队列同步器中;
- 如果存在于队列同步器中,加下来的线程交给AQS自己去处理了;
5、signal() 核心源码分析
5.1、signal() 入口方法
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() { // 判断当前线程是否拥有锁资源 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 唤醒头结点 doSignal(first); }
5.2 、void doSignal(Node first)
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 将当前节点添加到Sync的队列中,通过unlock()方法的调用,进行唤醒 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
5.3、 boolean transferForSignal(Node node)
final boolean transferForSignal(Node node) { // 设置当前节点的状态为0,通过cas,设置失败直接返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 将当前的节点添加到Sync的队列同步器中,同时返回当前节点的前一个节点 Node p = enq(node); int ws = p.waitStatus; // 如果当前节点的ws > 0 或者设置当前节点的状态 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒当前线程 LockSupport.unpark(node.thread); return true; }
至此signal 源码已经分析完成,signal方法主要是将当前的Condition的first节点,转移到Sync的FIFO中。等待唤醒操作。
6、总结
至此,AQS的核心知识点源码分析完成,本人能力有限,如果不妥的地方,欢迎指正。
阅读更多- Java多线程之ThreadPoolExecutor实现原理和源码分析(五)
- Java多线程之Future实现原理和源码分析(六)
- (10) java源码分析 ---- HashMap源码分析 及其 实现原理分析
- java并发编程之源码分析ThreadPoolExecutor线程池实现原理
- Java集合ArrayList实现原理——源码分析
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java中HashMap底层实现原理(JDK1.8)源码分析
- JAVA基础之HashMap实现原理及源码分析
- 【Java集合学习系列】HashMap实现原理及源码分析
- [Java多线程]-ThreadLocal源码及原理的深入分析
- Java 多线程分段下载原理分析和实现
- Java并发框架Disruptor实现原理与源码分析(三) RingBuffer原理模型与源码分析
- [转]Java HashMap实现原理与源码分析
- Java面试绕不开的问题: Java中HashMap底层实现原理(JDK1.8)源码分析
- Java基于微信公众号接口实现授权登录源码及原理分析
- java.lang.ThreadLocal实现原理和源码分析
- Memcache分布式实现原理---Java_Memcache 源码分析
- Java 多线程:分析线程池的实现原理
- Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作
- (10) java源码分析 ---- HashMap源码分析 及其 实现原理分析