Java多线程 之 ThreadPoolExecutor(九)
2016-06-28 12:20
411 查看
最近在工作中遇到了ThreadPoolExecutor的使用,而且是由于它的配置不当导致了线上问题。下面对其进行简单介绍。
先看看ThreadPoolExecutor常用的构造方法:
其中:
(1)corePoolSize:线程池中核心线程的个数
(2)maximumPoolSize:线程池中最大的线程个数
(3)long keepAliveTime,TimeUnit unit:线程池中超过corePoolSize的那些线程,如果空闲,能够活在线程池中的最长时间
(4)workQueue:工作队列(感觉叫等待队列比较好),一般使用LinkedBlockingQueue。
(5)handler:处理策略
threadPoolExecutor.execute(runnableTask);
ThreadPoolExecutor是通过execute方法将任务添加到线程池中。在java中要注意任务与线程的区别。感觉有点像程序和进程的区别。任务是一个静态的概念,只是用来表示要做的事情,而线程是一个动态的概念,线程是用来驱动任务的,也就是说任务要附着在线程上去执行。
下面详细说明下这5个参数的含义,大体经过以下过程:
a.当线程池中的线程数量少于corePoolSize时,当新的任务被execute提交到线程池时,不管线程池中是否有空闲线程,即使有空闲线程也要创建新线程。
b.当线程池中的线程数到达corePoolSize的上限时,当新的任务被execute提交到线程池时,这个新提交的任务会被添加到等待队列中。
c.当等待队列已经满了,当新的任务被execute提交到线程池时,会创建新的线程,直到到达maximumPoolSize。
d.如果maximumPoolSize也满了,当新的任务被execute提交到线程池时,就会使用后面的处理策略handler。
先定义一个概念,拒绝任务。
所谓拒绝任务是指,当线程池的等待队列已满,并且已经到达maximumPoolSize时,被execute添加进来的任务。
常见的处理策略有下面4种:
(1). CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。也就是说,这个策略会一直尝试着将这个新到来的任务添加到线程池中,直到添加成功。
(2). AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。
(3). DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。
(4). DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
在项目中最好使用CallerRunsPolicy ,否则其他策略都有抛弃的可能。就拿发送push来说,有些用户可能收不到push,尤其是一些领导收不到,这会很惨。这次线上问题的原因就是,原来代码使用的DiscardOldestPolicy 策略。
使用线程池的好处:
线程的创建与消亡会浪费很多时间,使用线程池可以减少这种时间消耗。
使用java提供的Executor还可以实现任务的提交与执行解耦。
线上问题的工程有一个生产线程池,一个消费线程池。生产线程池中的线程在执行过程中只是进行的简单调用,而消费线程池中的线程在执行过程中却要RPC调用,RPC调用相对于简单调用会消耗大量时间,因此消费线程池要设置的大一些。
线程池中线程的数量与CPU的核心数有很大关系。查看服务器CPU的核心数可以使用cat /proc/cpuinfo来查看。
线程池中线程的数量与等待队列的大小关系:等待队列比较消耗内存,线程消耗CPU,这个要衡量好。
下面有一个例子:
执行结果如下:
线程: 0 pool-1-thread-1 开始执行
线程: 0 pool-1-thread-1 执行完毕
线程: 9 pool-1-thread-1 开始执行
线程: 9 pool-1-thread-1 执行完毕
关闭后线程终止了吗?false
这里故意让线程开始执行之后sleep 1秒,这样新的线程被添加到线程池时都会讲等待队列中的旧的线程抛弃,导致只有第一个和最后一个能够执行。
先看看ThreadPoolExecutor常用的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
其中:
(1)corePoolSize:线程池中核心线程的个数
(2)maximumPoolSize:线程池中最大的线程个数
(3)long keepAliveTime,TimeUnit unit:线程池中超过corePoolSize的那些线程,如果空闲,能够活在线程池中的最长时间
(4)workQueue:工作队列(感觉叫等待队列比较好),一般使用LinkedBlockingQueue。
(5)handler:处理策略
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(25, 50, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(50000), new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.execute(runnableTask);
ThreadPoolExecutor是通过execute方法将任务添加到线程池中。在java中要注意任务与线程的区别。感觉有点像程序和进程的区别。任务是一个静态的概念,只是用来表示要做的事情,而线程是一个动态的概念,线程是用来驱动任务的,也就是说任务要附着在线程上去执行。
下面详细说明下这5个参数的含义,大体经过以下过程:
a.当线程池中的线程数量少于corePoolSize时,当新的任务被execute提交到线程池时,不管线程池中是否有空闲线程,即使有空闲线程也要创建新线程。
b.当线程池中的线程数到达corePoolSize的上限时,当新的任务被execute提交到线程池时,这个新提交的任务会被添加到等待队列中。
c.当等待队列已经满了,当新的任务被execute提交到线程池时,会创建新的线程,直到到达maximumPoolSize。
d.如果maximumPoolSize也满了,当新的任务被execute提交到线程池时,就会使用后面的处理策略handler。
先定义一个概念,拒绝任务。
所谓拒绝任务是指,当线程池的等待队列已满,并且已经到达maximumPoolSize时,被execute添加进来的任务。
常见的处理策略有下面4种:
(1). CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。也就是说,这个策略会一直尝试着将这个新到来的任务添加到线程池中,直到添加成功。
(2). AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。
(3). DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。
(4). DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
在项目中最好使用CallerRunsPolicy ,否则其他策略都有抛弃的可能。就拿发送push来说,有些用户可能收不到push,尤其是一些领导收不到,这会很惨。这次线上问题的原因就是,原来代码使用的DiscardOldestPolicy 策略。
使用线程池的好处:
线程的创建与消亡会浪费很多时间,使用线程池可以减少这种时间消耗。
使用java提供的Executor还可以实现任务的提交与执行解耦。
线上问题的工程有一个生产线程池,一个消费线程池。生产线程池中的线程在执行过程中只是进行的简单调用,而消费线程池中的线程在执行过程中却要RPC调用,RPC调用相对于简单调用会消耗大量时间,因此消费线程池要设置的大一些。
线程池中线程的数量与CPU的核心数有很大关系。查看服务器CPU的核心数可以使用cat /proc/cpuinfo来查看。
线程池中线程的数量与等待队列的大小关系:等待队列比较消耗内存,线程消耗CPU,这个要衡量好。
下面有一个例子:
package org.fan.learn.thread.share; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestRejectedPolicy { public static void main(String[] args) throws InterruptedException { // ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(1));//设置线程池只启动一个线程 阻塞队列一个元素 // pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new ThreadPoolExecutor.DiscardOldestPolicy()); //设置策略为挤掉最旧的 for (int i = 0; i < 10; i++) { final int j = i; pool.submit(new Runnable() { public void run() { System.out.println("线程: "+j + " " + Thread.currentThread().getName()+" 开始执行"); try { // Thread.sleep(1000L); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程: "+j + " " + Thread.currentThread().getName()+" 执行完毕"); } }); } Thread.sleep(5000L); pool.shutdown(); System.out.println("关闭后线程终止了吗?" + pool.isTerminated()); } }
执行结果如下:
线程: 0 pool-1-thread-1 开始执行
线程: 0 pool-1-thread-1 执行完毕
线程: 9 pool-1-thread-1 开始执行
线程: 9 pool-1-thread-1 执行完毕
关闭后线程终止了吗?false
这里故意让线程开始执行之后sleep 1秒,这样新的线程被添加到线程池时都会讲等待队列中的旧的线程抛弃,导致只有第一个和最后一个能够执行。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序