Java线程池ThreadPoolExecutor类使用详解
一、Executors创建线程池
二、ThreadPoolExecutor类
三、ThreadPoolExecutor类扩展
一、Executors创建线程池
Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,如Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()、Executors.newCachedThreadPool()等方法。这些方法虽然便捷,但是也有其局限性,如:OOM,线程耗尽。
小程序使用这些快捷方法没什么问题,对于服务端需要长期运行的程序,创建线程池应该直接使用ThreadPoolExecutor进行创建。上述便捷方法的创建也是通过ThreadPoolExecutor实现的。
二、ThreadPoolExecutor类
1、线程池工作顺序
线程池的工作顺序为:corePoolSize -> 任务队列 -> maximumPoolSize -> 拒绝策略
2、ThreadPoolExecutor构造函数
Executors中创建线程池的快捷方法,实际上是调用了ThreadPoolExecutor的构造方法,定时任务线程池便捷方法Executors.newScheduledThreadPool()内部使用的是ScheduledThreadPoolExecutor。ThreadPoolExecutor构造函数参数列表如下:
public ThreadPoolExecutor(int corePoolSize, //线程池核心线程数量 int maximumPoolSize, //线程池最大线程数量 long keepAliveTime, //超过corePooleSize的空闲线程的存活时长 TimeUnit unit, //空闲线程存活时长单位 BlockingQueue<Runnable> workQueue, //任务的排队队列 ThreadFactory threadFactory, //新线程的线程工厂 RejectedExecutionHandler handler) //拒绝策略
比较容易出问题的参数有corePoolSize、maximumPoolSize、workQueue以及handler:
- corePoolSize和maximumPoolSize设置不当会影响效率,甚至耗尽线程
- workQueue设置不当容易导致OOM
- handler设置不当会导致提交任务时抛出异常
3、workQueue任务队列
任务队列一般分为直接提交队列、有界任务队列、无解任务队列、优先任务队列。
- 直接任务队列:设置为SynchronousQueue队列。SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒;反之,每一个删除操作也要等待对应的插入操作。
public class SynchronousQueueTest { private static ExecutorService pool; public static void main(String[] args) { //核心线程数设为1,最大线程数设为2,任务队列为SynchronousQueue,拒绝策略为AbortPolicy,直接抛出异常 pool 1062 = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i = 0;i < 3;i++){ pool.execute(new ThreadTask()); } } } class ThreadTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); } }
执行结果如下:
pool-1-thread-1 pool-1-thread-2 Exception in thread "main" java.util.concurrent.RejectedExecutionException:
Task com.aisino.threadPool.ThreadTask@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running,
pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.aisino.threadPool.SynchronousQueueTest.main(SynchronousQueueTest.java:18)
由执行结果可知,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行拒绝策略抛出异常。
使用SynchronousQueue队列时,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数小于maximumPoolSize,则尝试创建新的线程,如果达到maximumPoolSize设置的最大值,则根据设置的handler执行对应的拒绝策略。因此使用SynchronousQueue队列时,任务不会被缓存起来,而是马上执行,在这种情况下,需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易执行拒绝策略。
- 有界任务队列:可使用ArrayBlockingQueue实现,如下所示:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue有界任务队列时,如果有新的任务需要执行,线程池会创建新的线程,直到创建的线程数量达到corePoolSize,之后新的任务会被加入到等待队列中 576 。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界任务队列初始量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize及以下;反之,当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
- 无界的任务队列:可使用LinkedBlockingQueue实现,如下所示:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize。在这种情况下maximumPoolSize参数是无效的,哪怕任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,线程数也不会再增加了。若后续 aec 有新的任务加入,则直接进入队列等待。使用这种任务队列模式时,要注意任务提交与处理之间的协调控制,不然会出现队列中的任务由于无法及时处理导致的一直增长,直到最后资源耗尽的问题。
- 优先任务队列:通过PriorityBlockingQueue实现,如下所示:
public class PriorityBlockingQueueTest { private static ExecutorService pool; public static void main(String[] args) { //使用优先任务队列 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i = 0;i < 10;i++){ pool.execute(new PriorityThreadTask(i)); } } } class PriorityThreadTask implements Runnable, Comparable<PriorityThreadTask>{ private int priority; public int getPriority(){ return priority; } public void setPriority(int priority){ this.priority = priority; } public PriorityThreadTask(){} public PriorityThreadTask(int priority){ this.priority = priority; } @Override public void run() { try{ //让线程阻塞,使后续任务进入缓存队列 Thread.sleep(1000); System.out.println("priority:" + this.priority + ", ThreadName:" + Thread.currentThread().getName()); }catch(InterruptedException e){ e.printStackTrace(); } } //当前对象和其他对象作比较,当前优先级大就返回-1,当前优先级小就返回1,值越小优先级越高 @Override public int compareTo(PriorityThreadTask o) { return this.priority > o.priority ? -1 : 1; } }
执行结果如下:
priority:0, ThreadName:pool-1-thread-1 priority:19, ThreadName:pool-1-thread-1 priority:18, ThreadName:pool-1-thread-1 priority:17, ThreadName:pool-1-thread-1 priority:16, ThreadName:pool-1-thread-1 priority:15, ThreadName:pool-1-thread-1 priority:14, ThreadName:pool-1-thread-1 priority:13, ThreadName:pool-1-thread-1 priority:12, ThreadName:pool-1-thread-1 priority:11, ThreadName:pool-1-thread-1 priority:10, ThreadName:pool-1-thread-1 priority:9, ThreadName:pool-1-thread-1 priority:8, ThreadName:pool-1-thread-1 priority:7, ThreadName:pool-1-thread-1 priority:6, ThreadName:pool-1-thread-1 priority:5, ThreadName:pool-1-thread-1 priority:4, ThreadName:pool-1-thread-1 priority:3, ThreadName:pool-1-thread-1 priority:2, ThreadName:pool-1-thread-1 priority:1, ThreadName:pool-1-thread-1
由执行结果可看出,除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列PriorityBlockingQueue中,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,在本例中corePoolSize为1,即线程数一直为1。
PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论 3126 添加了多少个任务,线程池创建的线程数量也不会超过corePoolSize。其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
4、handler拒绝策略
在创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列。在创建有界任务队列模式下,当任务队列已满且线程池创建的线程数达到最大线程数时,需要指定ThreadPoolExecutor的RejectedExecutionHandler参数来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:
- AbortPolicy策略:该策略会直接会直接抛出异常,阻止系统正常工作
- DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。使用此策略时,业务场景中需允许任务的丢失
- DiscardOldestPolicy策略:该策略会丢弃任务队列中最老的一个任务,即任务队列中最先被添加进去的、马上要被执行的任务,并尝试再次提交任务(重复此过程)
- CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行
以上内置的拒绝策略均实现了RejectedExecutionHandler接口,也可自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略。示例代码如下:
/** * 自定义拒绝策略 */ public class CustomRejectedExecutionHandlerTest { private static ExecutorService pool; public static void main(String[] args) { //自定义拒绝策略 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " 执行了拒绝策略"); } }); for (int i = 0; i < 10; i++) { pool.execute(new CustomRejectedExecutionHandlerThreadTask()); } } } class CustomRejectedExecutionHandlerThreadTask implements Runnable { @Override public void run() { try { //让线程阻塞,使后续任务机进入缓存队列 Thread.sleep(1000); System.out.println("线程名称:" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下:
com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@3cd1a2f1 执行了拒绝策略 com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@2f0e140b 执行了拒绝策略 com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@7440e464 执行了拒绝策略 线程名称:pool-1-thread-2 线程名称:pool-1-thread-1 线程名称:pool-1-thread-2 线程名称:pool-1-thread-1 线程名称:pool-1-thread-2 线程名称:pool-1-thread-1 线程名称:pool-1-thread-2
由执行结果可看出,由于任务添加了休眠阻塞,执行任务需要花费一定时间,导致有一定数量的任务被丢弃,从而执行自定义的拒绝策略。
5、ThreadFactory自定义线程创建
线程池中的线程是通过ThreadPoolExecutor中的线程工厂ThreadFactory创建的。可自定义ThreadFactory对线程池中的线程进行一些特殊的设置(命名、设置优先级等)。示例代码如下:
public class CustomThreadFactoryTest { private static ExecutorService pool; public static void main(String[] args) { //自定义线程工厂 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("创建线程:" + r.hashCode()); //线程名称 Thread th = new Thread(r, "threadPool-" + r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()); for(int i = 0;i < 10;i++){ pool.execute(new CustomThreadFactoryThreadTask()); } } } class CustomThreadFactoryThreadTask implements Runnable{ @Override public void run(){ //输出执行线程的名称 System.out.println("线程名称:" + Thread.currentThread().getName()); } }
执行结果如下:
创建线程:1259475182 创建线程:1300109446 创建线程:1020371697 线程名称:threadPool-1259475182 线程名称:threadPool-1300109446 线程名称:threadPool-1259475182 创建线程:789451787 线程名称:threadPool-1020371697 线程名称:threadPool-1259475182 线程名称:threadPool-1300109446 线程名称:threadPool-1259475182 线程名称:threa 3dcb dPool-1020371697 线程名称:threadPool-789451787 线程名称:threadPool-1300109446
由执行结果可看出,每个线程的创建都进行了记录输出与命名。
6、正确构造线程池
int poolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); executorService = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, queue, policy);
三、ThreadPoolExecutor类扩展
ThreadPoolExecutor类扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口。
- beforeExecute:线程池中任务运行前执行
- afterExecute:线程池中任务运行完毕后执行
- terminated:线程池退出后执行
通过这三个接口可以监控每个任务的开始时间和结束时间,或者其他功能。示例代码如下:
public class ThreadPoolExecutorExtensionTest { private static ExecutorService pool; public static void main(String[] args) { //自定义线程,为线程重命名 pool = new ThreadPoolExecutor(1, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("创建线程:" + r.hashCode()); Thread th = new Thread(r, "ThreadPool-" + r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()){ protected void beforeExecute(Thread t,Runnable r) { System.out.println("准备执行的任务名称:"+ ((ThreadPoolExecutorExtensionThreadTask)r).getTaskName()); } protected void afterExecute(Runnable r,Throwable t) { System.out.println("执行完毕的任务名称:"+((ThreadPoolExecutorExtensionThreadTask)r).getTaskName()); } protected void terminated() { System.out.println("线程池退出"); } }; for(int i = 0;i < 10;i++){ pool.execute(new ThreadPoolExecutorExtensionThreadTask("Task-" + i)); } pool.shutdown(); } } class ThreadPoolExecutorExtensionThreadTask implements Runnable{ private String taskName; public String getTaskName(){ return taskName; } public void setTaskName(String taskName){ this.taskName = taskName; } public ThreadPoolExecutorExtensionThreadTask(){} public ThreadPoolExecutorExtensionThreadTask(String taskName){ this.taskName = taskName; } @Override public void run() { //输出任务名称以及对应的执行线程名称 System.out.println("任务名称:" + this.taskName + ", 执行线程名称:" + Thread.currentThread().getName()); } }
执行结果如下:
创建线程:1259475182 创建线程:1300109446 创建线程:1020371697 准备执行的任务名称:Task-0 创建线程:789451787 任务名称:Task-0, 执行线程名称:ThreadPool-1259475182 准备执行的任务名称:Task-6 任务名称:Task-9, 执行线程名称:main 执行完毕的任务名称:Task-0 准备执行的任务名称:Task-7 准备执行的任务名称:Task-8 任务名称:Task-6, 执行线程名称:ThreadPool-1300109446 任务名称:Task-8, 执行线程名称:ThreadPool-789451787 执行完毕的任务名称:Task-8 准备执行的任务名称:Task-1 任务名称:Task-7, 执行线程名称:ThreadPool-1020371697 执行完毕的任务名称:Task-7 任务名称:Task-1, 执行线程名称:ThreadPool-1259475182 执行完毕的任务名称:Task-1 准备执行的任务名称:Task-2 任务名称:Task-2, 执行线程名称:ThreadPool-789451787 执行完毕的任务名称:Task-2 执行完毕的任务名称:Task-6 准备执行的任务名称:Task-5 任务名称:Task-5, 执行线程名称:ThreadPool-789451787 执行完毕的任务名称:Task-5 准备执行的任务名称:Task-4 准备执行的任务名称:Task-3 任务名称:Task-4, 执行线程名称:ThreadPool-1259475182 执行完毕的任务名称:Task-4 任务名称:Task-3, 执行线程名称:ThreadPool-1020371697 执行完毕的任务名称:Task-3 线程池退出
由执行结果可看出,通过对beforeExecute()、afterExecute()和terminated()的实现,可以对线程池中线程的状态进行监控,在线程执行前后输出了相关的打印信息。另外,使用shutdown()方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池将不再接受后续添加的任务。但是,线程池不会立刻退出,而是等到添加到线程池中的任务都处理完成,才会退出。
- Java中使用MySQL从安装、配置到实际程序测试详解
- Java中正则表达式使用方法详解
- java.util.ResourceBundle使用详解
- Java中使用MySQL从安装、配置到实际程序测试详解
- java.util.logging.Logger使用详解
- 如何在Oracle中使用Java存储过程 (详解)
- Java Graphics2D 使用详解
- 在Eclipse的Web项目中java里面使用ant进行数字签名步骤详解
- 使用Java操作文本文件的方法详解
- java.util.logging.Logger使用详解
- 高阶Java-Java注解 Java annotation 使用详解
- java减少内存使用详解
- Flex使用Blazeds与Java交互及自定义对象转换详解
- 使用Java操作文本文件的方法详解
- 技术转载:Jni学习三:jni使用java对象详解
- java.util.Date java.sql.Date java.sql.Timestamp 使用详解
- Java使用AES加密和解密的实例详解
- Java下使用Oracle存储过程(详解)第1/3页
- 如何在Oracle中使用Java存储过程 (详解)
- JAVA 中的IO流详解及其使用方法、例子