并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue
我们知道线程池运行时,会不断从任务队列中获取任务,然后执行任务。如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行。
队列是先进先出的数据结构,就是先进入队列的数据,先被获取。但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。
实现优先级队列高效常用的一种方式就是使用堆。
什么是堆?
堆通常是一个可以被看做一棵树的数组对象。
堆(heap)又被为优先队列(priority queue)。尽管名为优先队列,但堆并不是队列。
因为队列中允许的操作是先进先出(FIFO),在队尾插入元素,在队头取出元素。
而堆虽然在堆底插入元素,在堆顶取出元素,但是堆中元素的排列不是按照到来的先后顺序,而是按照一定的优先顺序排列的。
这里来说明一下满二叉树的概念与完全二叉树的概念。
满二叉树
除了叶子节点,所有的节点的左右孩子都不为空,就是一棵满二叉树,如下图。
可以看出:满二叉树所有的节点都拥有左孩子,又拥有右孩子。
完全二叉树
不一定是一个满二叉树,但它不满的那部分一定在右下侧,如下图
堆总是满足下列性质:
- 堆中某个节点的值总是不大于或不小于其父节点的值;
- 堆总是一棵完全二叉树。
- 最大值时,称为“最大堆”,也称大顶堆;
- 最小值时,称为“最小堆”,也称小顶堆。
堆的实现
堆是一个二叉树,但是它最简单的方式是通过数组去实现二叉树,而且因为堆是一个完全二叉树,就不存在数组空间的浪费。怎么使用数组来存储二叉树呢?
就是用数组的下标来模拟二叉树的各个节点,比如说根节点就是0,第一层的左节点是1,右节点是2。由此我们可以得出下列公式:
// 对于n位置的节点来说: int left = 2 * n + 1; // 左子节点 int right = 2 * n + 2; // 右子节点 int parent = (n - 1) / 2; // 父节点,当然n要大于0,根节点是没有父节点的
对于堆来说,只有两个操作,插入insert和删除remove,不管插入还是删除保证堆的成立条件,1.是完全二叉树,2.父节点的值不能小于子节点的值。
最大堆的插入(ADD)
public void insert(int value) { // 第一步将插入的值,直接放在最后一个位置。并将长度加一 store[size++] = value; // 得到新插入值所在位置。 int index = size - 1; while(index > 0) { // 它的父节点位置坐标 int parentIndex = (index - 1) / 2; // 如果父节点的值小于子节点的值,你不满足堆的条件,那么就交换值 if (store[index] > store[parentIndex]) { swap(store, index, parentIndex); index = parentIndex; } else { // 否则表示这条路径上的值已经满足降序,跳出循环 break; } } }
主要步骤:
-
直接将value插入到size位置,并将size自增,这样store数组中插入一个值了。
-
要保证从这个叶节点到根节点这条路径上的节点,满足父节点的值不能小于子节点。
-
通过int parentIndex = (index - 1) / 2得到父节点,如果比父节点值大,那么两者位置的值交换,然后再拿这个父节点和它的父父节点比较。
直到这个节点值比父节点值小,或者这个节点已经是根节点就退出循环。
因为每次循环index都是除以2这种倍数递减的方式,所以它最多循环次数是(log N)次。
最大堆的删除(DELETE)
public int remove() { // 将根的值记录,最后返回 int result = store[0]; // 将最后位置的值放到根节点位置 store[0] = store[--size]; int index = 0; // 通过循环,保证父节点的值不能小于子节点。 while(true) { int leftIndex = 2 * index + 1; // 左子节点 int rightIndex = 2 * index + 2; // 右子节点 // leftIndex >= size 表示这个子节点还没有值。 if (leftIndex >= size) break; int maxIndex = leftIndex; //找到左右节点中较大的一个节点 if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex; //与子节点中较大的子节点比较,如果子节点更大,则交换位置 //为什么要与较大的子节点比较呢?如果和较小的节点比较,没有交换位置,但有可能比较大的节点小 if (store[index] < store[maxIndex]) { swap(store, index, maxIndex); index = maxIndex; } else { //满足子节点比当前节点小,退出循环 break; } } //返回最开始的第一个值 return result; }
在堆中最大值就在根节点,所以操作步骤:
-
将根节点的值保存到result中。
-
将最后节点的值移动到根节点,再将长度减一,这样满足堆成立第一个条件,堆是一个完全二叉树。
-
使用循环,来满足堆成立的第二个条件,父节点的值不能小于子节点的值。
-
最后返回result。
每次循环我们都是以2的倍数递增,所以它也是最多循环次数是(log N)次。
所以通过堆这种方式可以快速实现优先级队列,它的插入和删除操作的效率都是O(log N)。
那么怎么实现堆排序?这个很简单,利用优先队列的特性:
- 先遍历数组。将数组中的值依次插入到堆中。
- 然后再用一个循环将值从堆中取出来。
private static void headSort(int[] arr) { int size = arr.length; Head head = new Head(size); for (int i = 0; i < size; i++) { head.insert(arr[i]); } for (int i = 0; i < size; i++) { // 实现从大到小的排序 arr[size - 1 - i] = head.remove(); } }
堆排序的效率:因为每次插入数据效率是O(log N),而我们需要进行n次循环,将数组中每个值插入到堆中,所以它的执行时间是O(N * log N)级。
DelayedWorkQueue类
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
从定义中看出DelayedWorkQueue是一个阻塞队列。并且DelayedWorkQueue是一个最小堆,最顶点的值最小,即堆中某个节点的值总是不小于其父节点的值。
属性
// 初始时,数组长度大小。 private static final int INITIAL_CAPACITY = 16; // 使用数组来储存队列中的元素。 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 使用lock来保证多线程并发安全问题。 private final ReentrantLock lock = new ReentrantLock(); // 队列中储存元素的大小 private int size = 0; //特指队列头任务所在线程 private Thread leader = null; // 当队列头的任务延时时间到了,或者有新的任务变成队列头时,用来唤醒等待线程 private final Condition available = lock.newCondition();
DelayedWorkQueue是用数组来储存队列中的元素,那么我们看看它是怎么实现优先级队列的。
插入元素方法
public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); }
我们发现与普通阻塞队列相比,这三个添加方法都是调用offer方法。那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进行数组扩容。
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 使用lock保证并发操作安全 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 如果要超过数组长度,就要进行数组扩容 if (i >= queue.length) // 数组扩容 grow(); // 将队列中元素个数加一 size = i + 1; // 如果是第一个元素,那么就不需要排序,直接赋值就行了 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 调用siftUp方法,使插入的元素变得有序。 siftUp(i, e); } // 表示新插入的元素是队列头,更换了队列头, // 那么就要唤醒正在等待获取任务的线程。 if (queue[0] == e) { leader = null; // 唤醒正在等待等待获取任务的线程 available.signal(); } } finally { lock.unlock(); } return true; }
数组扩容方法:
private void grow() { int oldCapacity = queue.length; // 每次扩容增加原来数组的一半数量。 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; // 使用Arrays.copyOf来复制一个新数组 queue = Arrays.copyOf(queue, newCapacity); }
插入元素排序siftUp方法:
private void siftUp(int k, RunnableScheduledFuture<?> key) { // 当k==0时,就到了堆二叉树的根节点了,跳出循环 while (k > 0) { // 父节点位置坐标, 相当于(k - 1) / 2 int parent = (k - 1) >>> 1; // 获取父节点位置元素 RunnableScheduledFuture<?> e = queue[parent]; // 如果key元素大于父节点位置元素,满足条件,那么跳出循环 // 因为是从小到大排序的。 if (key.compareTo(e) >= 0) break; // 否则就将父节点元素存放到k位置 queue[k] = e; // 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。 setIndex(e, k); // 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点 k = parent; } // 循环结束,k就是元素key应该插入的节点位置 queue[k] = key; setIndex(key, k); }
主要是三步:
- 元素个数超过数组长度,就会调用grow()方法,进行数组扩容。
- 将新元素e添加到优先级队列中对应的位置,通过siftUp方法,保证按照元素的优先级排序。
- 如果新插入的元素是队列头,即更换了队列头,那么就要唤醒正在等待获取任务的线程。这些线程可能是因为原队列头元素的延时时间没到,而等待的。
假设现有元素 5 需要插入,为了维持完全二叉树的特性,新插入的元素一定是放在结点 6 的右子树;同时为了满足任一结点的值要小于左右子树的值这一特性,新插入的元素要和其父结点作比较,如果比父结点小,就要把父结点拉下来顶替当前结点的位置,自己则依次不断向上寻找,找到比自己大的父结点就拉下来,直到没有符合条件的值为止。
动画讲解:
在这里先将元素 5 插入到末尾,即放在结点 6 的右子树。
然后与父类比较, 6 > 5 ,父类数字大于子类数字,子类与父类交换。
重复此操作,直到不发生替换。
立即获取队列头元素
public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; // 队列头任务是null,或者任务延时时间没有到,都返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else // 移除队列头元素 return finishPoll(first); } finally { lock.unlock(); } }
当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,因此直接返回null。否则调用finishPoll方法,移除队列头元素并返回。
// 移除队列头元素 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { // 将队列中元素个数减一 int s = --size; // 获取队列末尾元素x RunnableScheduledFuture<?> x = queue[s]; // 原队列末尾元素设置为null queue[s] = null; if (s != 0) // 将队列最后一个元素移动到对列头元素位置,然后向下排序 // 因为移除了队列头元素,所以进行重新排序。 siftDown(0, x); setIndex(f, -1); return f; }
这个方法与我们在第一节中,介绍堆的删除方法一样。
- 先将队列中元素个数减一。
- 将原队列末尾元素设置成队列头元素,再将队列末尾元素设置为null。
- 调用siftDown(0, x)方法,保证按照元素的优先级排序。
移除元素排序siftDown方法:
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; // 通过循环,保证父节点的值不能大于子节点。 while (k < half) { // 左子节点, 相当于 (k * 2) + 1 int child = (k << 1) + 1; // 左子节点位置元素 RunnableScheduledFuture<?> c = queue[child]; // 右子节点, 相当于 (k * 2) + 2 int right = child + 1; // 如果左子节点元素值大于右子节点元素值,那么右子节点才是较小值的子节点。 // 就要将c与child值重新赋值 if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; // 如果父节点元素值小于较小的子节点元素值,那么就跳出循环 if (key.compareTo(c) <= 0) break; // 否则,父节点元素就要和子节点进行交换 queue[k] = c; setIndex(c, k); k = child; } // 循环结束,k就是元素key应该插入的节点位置 queue[k] = key; setIndex(key, k); }
我们来看看动画
核心点:将最后一个元素填充到堆顶,然后不断的下沉这个元素。
假设要从节点 1 ,也可以称为取出节点 1 ,为了维持完全二叉树的特性 ,我们将最后一个元素 6 去替代这个 1 ;然后比较 1 和其子树的大小关系,如果比左右子树大(如果存在的话),就要从左右子树中找一个较小的值替换它,而它能自己就要跑到对应子树的位置,再次循环这种操作,直到没有子树比它小。
通过这样的操作,堆依然是堆,总结一下:
- 找到要删除的节点(取出的节点)在数组中的位置
- 用数组中最后一个元素替代这个位置的元素
- 当前位置和其左右子树比较,保证符合最小堆的节点间规则
- 删除最后一个元素
等待获取队列头元素
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 如果没有任务,就让线程在available条件下等待。 if (first == null) available.await(); else { // 获取任务的剩余延时时间 long delay = first.getDelay(NANOSECONDS); // 如果延时时间到了,就返回这个任务,用来执行。 if (delay <= 0) return finishPoll(first); // 将first设置为null,当线程等待时,不持有first的引用 first = null; // don't retain ref while waiting // 如果还是原来那个等待队列头任务的线程, // 说明队列头任务的延时时间还没有到,继续等待。 if (leader != null) available.await(); else { // 记录一下当前等待队列头任务的线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 当任务的延时时间到了时,能够自动超时唤醒。 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) // 唤醒等待任务的线程 available.signal(); lock.unlock(); } }
如果队列中没有任务,那么就让当前线程在available条件下等待。如果队列头任务的剩余延时时间delay大于0,那么就让当前线程在available条件下等待delay时间。
超时等待获取队列头元素
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 如果没有任务。 if (first == null) { // 超时时间已到,那么就直接返回null if (nanos <= 0) return null; else // 否则就让线程在available条件下等待nanos时间 nanos = available.awaitNanos(nanos); } else { // 获取任务的剩余延时时间 long delay = first.getDelay(NANOSECONDS); // 如果延时时间到了,就返回这个任务,用来执行。 if (delay <= 0) return finishPoll(first); // 如果超时时间已到,那么就直接返回null if (nanos <= 0) return null; // 将first设置为null,当线程等待时,不持有first的引用 first = null; // don't retain ref while waiting // 如果超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。 // 在这里让线程等待超时时间nanos if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 当任务的延时时间到了时,能够自动超时唤醒。 long timeLeft = available.awaitNanos(delay); // 计算剩余的超时时间 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) // 唤醒等待任务的线程 available.signal(); lock.unlock(); } }
与take方法相比较,就要考虑设置的超时时间,如果超时时间到了,还没有获取到有用任务,那么就返回null。其他的与take方法中逻辑一样。
推荐博客
https://www.geek-share.com/detail/2758014500.html
总结
使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。
- 并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)
- 并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
- 并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之submit方法 (二)
- Kafka源码深度解析-序列8 -Consumer -Fetcher实现原理与offset确认机制
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- jquery源码解析:jQuery队列操作queue方法实现的原理
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- Spring AOP源码解析——AOP动态代理原理和实现方式
- 快速傅里叶变换(FFT)的原理、实现及代码解析(附C#源码)
- 神经网络中 BP 算法的原理与 PYTHON 实现源码解析
- Vue.js 源码全方位深入解析 学精学透 Vue 原理实现
- Apk源码的加固(加壳)原理解析和实现
- Java集合类,从源码解析底层实现原理
- [置顶] [Java容器]LinkedHashMap实现原理与源码解析
- Android 带你从源码的角度解析Scroller的滚动实现原理
- Java集合,ConcurrentLinkedQueue源码解析(常用于并发编程)
- 快速傅里叶变换(FFT)的原理、实现及代码解析(附C#源码)
- 【React Native】从源码一步一步解析它的实现原理
- 02.Spring IOC源码深度解析之容器的基本实现
- 索引优先队列-IndexedPrirotyQueue的原理及实现(源码)