您的位置:首页 > 编程语言 > Java开发

Java多线程 之 ThreadPoolExecutor(九)

2016-06-28 12:20 411 查看
最近在工作中遇到了ThreadPoolExecutor的使用,而且是由于它的配置不当导致了线上问题。下面对其进行简单介绍。

先看看ThreadPoolExecutor常用的构造方法:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}


其中:

(1)corePoolSize:线程池中核心线程的个数

(2)maximumPoolSize:线程池中最大的线程个数

(3)long keepAliveTime,TimeUnit unit:线程池中超过corePoolSize的那些线程,如果空闲,能够活在线程池中的最长时间

(4)workQueue:工作队列(感觉叫等待队列比较好),一般使用LinkedBlockingQueue。

(5)handler:处理策略

private static ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(25, 50, 5L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(50000),
new ThreadPoolExecutor.CallerRunsPolicy());


threadPoolExecutor.execute(runnableTask);

ThreadPoolExecutor是通过execute方法将任务添加到线程池中。在java中要注意任务与线程的区别。感觉有点像程序和进程的区别。任务是一个静态的概念,只是用来表示要做的事情,而线程是一个动态的概念,线程是用来驱动任务的,也就是说任务要附着在线程上去执行。

下面详细说明下这5个参数的含义,大体经过以下过程:

a.当线程池中的线程数量少于corePoolSize时,当新的任务被execute提交到线程池时,不管线程池中是否有空闲线程,即使有空闲线程也要创建新线程。

b.当线程池中的线程数到达corePoolSize的上限时,当新的任务被execute提交到线程池时,这个新提交的任务会被添加到等待队列中。

c.当等待队列已经满了,当新的任务被execute提交到线程池时,会创建新的线程,直到到达maximumPoolSize。

d.如果maximumPoolSize也满了,当新的任务被execute提交到线程池时,就会使用后面的处理策略handler。

先定义一个概念,拒绝任务。

所谓拒绝任务是指,当线程池的等待队列已满,并且已经到达maximumPoolSize时,被execute添加进来的任务。

常见的处理策略有下面4种:

(1). CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。也就是说,这个策略会一直尝试着将这个新到来的任务添加到线程池中,直到添加成功。

(2). AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。

(3). DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

(4). DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

在项目中最好使用CallerRunsPolicy ,否则其他策略都有抛弃的可能。就拿发送push来说,有些用户可能收不到push,尤其是一些领导收不到,这会很惨。这次线上问题的原因就是,原来代码使用的DiscardOldestPolicy 策略。

使用线程池的好处:

线程的创建与消亡会浪费很多时间,使用线程池可以减少这种时间消耗。

使用java提供的Executor还可以实现任务的提交与执行解耦。

线上问题的工程有一个生产线程池,一个消费线程池。生产线程池中的线程在执行过程中只是进行的简单调用,而消费线程池中的线程在执行过程中却要RPC调用,RPC调用相对于简单调用会消耗大量时间,因此消费线程池要设置的大一些。

线程池中线程的数量与CPU的核心数有很大关系。查看服务器CPU的核心数可以使用cat /proc/cpuinfo来查看。

线程池中线程的数量与等待队列的大小关系:等待队列比较消耗内存,线程消耗CPU,这个要衡量好。

下面有一个例子:

package org.fan.learn.thread.share;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestRejectedPolicy {
public static void main(String[] args) throws InterruptedException {
//        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
//                new ArrayBlockingQueue<Runnable>(1));//设置线程池只启动一个线程 阻塞队列一个元素
//        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardOldestPolicy());
//设置策略为挤掉最旧的
for (int i = 0; i < 10; i++) {
final int  j = i;
pool.submit(new Runnable() {
public void run() {
System.out.println("线程: "+j + " " + Thread.currentThread().getName()+"  开始执行");
try {
//                        Thread.sleep(1000L);
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程: "+j + " " + Thread.currentThread().getName()+"  执行完毕");
}
});
}
Thread.sleep(5000L);
pool.shutdown();
System.out.println("关闭后线程终止了吗?" + pool.isTerminated());
}
}


执行结果如下:

线程: 0 pool-1-thread-1 开始执行

线程: 0 pool-1-thread-1 执行完毕

线程: 9 pool-1-thread-1 开始执行

线程: 9 pool-1-thread-1 执行完毕

关闭后线程终止了吗?false

这里故意让线程开始执行之后sleep 1秒,这样新的线程被添加到线程池时都会讲等待队列中的旧的线程抛弃,导致只有第一个和最后一个能够执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息