您的位置:首页 > 其它

Executors.newCachedThreadPool的底层源码浅析

2019-06-02 01:44 771 查看
版权声明:原创文章欢迎转载,转载请注明出处 https://blog.csdn.net/w605283073/article/details/90735382

1、BG(背景)

《线程池好处和核心参数等面试必备》对线程池的优点以及核心参数等进行了全面的介绍。

从整体角度大致谈谈Executors.newCachedThreadPool这个函数。

2、JDK Code

关于线程池的核心参数请看背景中提到的那篇文章。

 

首先老规矩,上源码(ps:看美女)。

java.util.concurrent.Executors#newCachedThreadPool()

注释给出了该方法的说明:

该方法的目的是创建一个线程池。

该线程池在前面的线程可用时将会重用之前的线程,否则则创建新的线程。

该线程池对执行短的异步任务性能提升很大。

调用execute函数如果之前构造的线程没有销毁(60s保活期,没任务超期销毁)则会重用之前的线程。

60秒内没被用过的线程将会被终止从线程池缓存中移除掉。

因此该线程池闲置时不会消耗任何资源。

 

我们发现调用了java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>)构造方法来构建ThreadPoolExecutor对象。

[code]   public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

核心线程数为0,最大线程数为整数最大值,保活时间为60秒,工作队列为SynchronousQueue。

如果线程池中的线程数大于核心线程数且队列满了,且线程数小于最大线程数,则会创建新的线程。

maximumPoolSize 最大可以至Integer.MAX_VALUE,是高度可伸缩的线程池,如果达到这个上限,相信没有任何服务器能够继续工作,肯定会拋出OOM异常。

keepAliveTime 默认为60秒,工作线程处于空闲状态,则回收工作线程。如果任务数增加,再次创建出新线程处理任务。

    ----《码处高效Java开发手册》

 

这里设置保活时间是为了线程池中的线程尽可能得能够复用,即60s内如果有新任务进来,就先不要创建新线程了,先用之前的线程来执行任务。

 

这点和redis的key过期时间有相通之处,redis设置过期时间比如为60s,如果60s之内访问,则可以用到缓存,否则就无法使用缓存需要查数据库了。(类比学习)

另外可以了解一下Executors.defaultThreadFactory(),默认的线程工厂的源码,了解其命名规则。

依然上源码(ps: 你懂得)

了解默认的拒绝策略

[code]   /**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

其策略为:抛出拒绝执行异常

另外也可以顺便了解一下其他的拒绝策略。

参考

《Java ThreadPoolExecutor的拒绝策略》

《Java ThreadPoolExecutor的拒绝策略CallerRunsPolicy的一个潜在的大坑》

另外执行java.util.concurrent.ThreadPoolExecutor#execute 查看任务执行的代码

 

 

3、Show Me Your Code

我们通过Executors.newCachedThreadPool();创建ExecutorService。

然后执行10次任务,任务内容为打印当前线程+打印当前的整数,从1开始。

3.1 代码

[code]package com.chujianyun.common.thread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class PoolDemo {
private static ExecutorService executorService = Executors.newCachedThreadPool();

private static AtomicInteger atomicInteger = new AtomicInteger();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread.sleep(100L);
executorService.submit(PoolDemo::print);
}
}

public static void print() {
System.out.println(Thread.currentThread().getName());
System.out.println(atomicInteger.incrementAndGet());
}

}

结果:

由于每个任务之间间隔为100ms而且任务执行时间非常短,线程池用一个线程就足以处理这十个任务。

就是说复用了同一个线程,执行了10次任务。

3.2 如果我们注释掉

[code]Thread.sleep(100L);

发现线程池创建了两个线程(也可能会更多)来处理我们这10个任务。

这就涉及了SynchronousQueue的特征或者概念。

具体最好还是直接JDK里读源码:java.util.concurrent.SynchronousQueue,

这里就不展开了,大概介绍一下:

Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

参考文章:https://www.geek-share.com/detail/2587270580.html

比如第一次执行任务,创建“pool-1-thread-1”,“生产者线程对其的插入操作put必须等待消费者的移除操作take”,线程1拿走数据,打印后60s保活。

然后主线程又来了第二个任务,“pool-1-thread-1”空闲,继续继续它来take数据。

然后主线程又来了第二个任务,此时“pool-1-thread-1”可能还没有执行完任务(打印线程名+打印自增整数)。

此时符合大于核心线程数,且队列“已满”,则创建新的线程“pool-1-thread-2”,take任务执行。

然后如果忙得过来,都是他俩线程去执行。

如果忙不过来,比如pool-1-thread-1”和 “pool-1-thread-2”都在执行任务,生产者又put数据了,则又要新建一个线程。

3.3 修改代码

提交任务之后,让主线程sleep 65s后再提交任务,这样缓存的线程池刚刚失效,

[code]package com.chujianyun.common.thread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class PoolDemo {
private static ExecutorService executorService = Executors.newCachedThreadPool();

private static AtomicInteger atomicInteger = new AtomicInteger();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
executorService.submit(PoolDemo::print);
Thread.sleep(65000L);
}
}

public static void print() {
System.out.println(Thread.currentThread().getName());
System.out.println(atomicInteger.incrementAndGet());
}

}

这样每次都要创建新的线程。

关于线程池的更多细节可以深入学习java.util.concurrent.ThreadPoolExecutor的源码。

4、Think More

(1)我们学习某个类的时候,如果有经典的图书可以先看看书或者官方文档。

(2)然后尽量去看源码,源码的注释比较权威!另外如果懂得超深入,则细看源码,否则大致看看,如本文知道个大概。

(3)另外打断点调试也是学习源码的重要方式,可以断点单步走,可能都学更多(自行探索)。

(4)类比是巩固知识的一个重要方式,通过类比发现知识的共性,记忆更加牢固。

 

如果觉得本文对你有帮助,欢迎点赞,欢迎关注我,如果有补充欢迎评论交流,我将努力创作更多更好的文章。

另外欢迎加入我的知识星球,知识星球ID:15165241 一起交流学习。

https://t.zsxq.com/Z3bAiea  申请时标注来自CSDN。

原创文章转载请注明出处:https://blog.csdn.net/w605283073/article/details/90735382

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: