Java线程池的那点事
2016-02-18 17:11
489 查看
本文为作者(石岩)原创,转载请指明出处:
http://mp.blog.csdn.net/mdeditor/50687412
大家开发程序时,基本上都会使用线程去处理一些任务,我相信大家最通常的做法都是直接创建一个Thread然后在run方法里面去处理自己的逻辑,这种做法当然可以,但是大家考虑一下,如果你有很多任务需要处理怎么做呢,难道你每个任务都去创建一个Thread吗,这样思考一下,我们会想到能不能找到一个装载线程的容器,然后通过控制这个容器,就可以管理容器里面的线程,那么这个容器我们就叫做线程池。
本篇文章主要讲解以下4个方面
1.核心API介绍
2.ThreadPoolExecutor类的使用
3.线程池的实现原理
4.使用实例
5.如何合理配置线程池的大小
public void execute(Runnable runnable);
该接口声明了execute方法,将Runnable对象传入,启动该方法相当于启动了该线程
2.ExecutorService接口
该接口是Executor的子接口,提供了submit方法,该方法返回Future对象,Future可用于控制线程的执行与取消
3.ScheduledExecutorService接口
该接口是ExecutorService的子接口,提供了线程启动时机的控制
4.Callable接口
public Object call() throws Exception;
该接口类似Runnable接口,只是有返回值和抛出的异常,该方法返回的值就是ExecutorService接口调用submit(Callable call)方法返回的Future调用get方法返回的值。
5.Future接口
Future接口用于控制线程的执行。调用cancell(boolean bool)方法将会取消正在运行的进程。
6.Executors
这是一个线程池的工厂类,该类提供了许多生成Executor,ExecutorService,ScheduleExecutorService的工厂方法。
Executor就相当于一个线程池,提供对线程的管理功能,包括启动线程,取消线程执行,加入线 程,移除线程,以及协调线程之间的工作等功能,这些随着Executor级别(即子接口,实现类)越高而功能越强。
构造方法:
下面我们看一下前3个构造方法的源码:
通过源码发现,前3个构造方法都是调用的第4个构造方法进行初始化的。
下面看一下构造方法中,各个参数的意义。
corePoolSize:池中所保存的线程数,包括空闲线程,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:池中允许的最大线程数
keepAliveTime:当线程数大于核心(corePoolSize)时,此为终止前多余的空闲线程等待新任务的最长时间。
unit:keepAliveTime 参数的时间单位。(下方7种取值)
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:执行前用于保持任务的队列。此队列仅保持由execute方法提交的Runnable任务。这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue 无界队列
LinkedBlockingQueue 有界队列
SynchronousQueue 工作队列的默认选项是SynchronousQueue
线程池的排队策略与BlockingQueue有关。
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
方法介绍:
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。如果已启动所有核心线程,此方法将返回false。
启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
如果允许核心线程超时,value为true,否则为false
线程池的状态
任务的执行
线程池中的线程初始化
任务缓存队列及排队策略
任务拒绝策略
线程池的关闭
线程池容量的动态调整
1.线程池的状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue workQueue;//任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态的改变都要使用这个锁
private final HashSet workers = new HashSet(); //用来存放工作集
private volatile boolean allowCoreThreadTimeOut;//是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。
它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:
Thread t = new Thread(w);
相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。
既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:
从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:
在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:
也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:
从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中
这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。
我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize < maximumPoolSize不同而已。
总结:
1.要清楚corePoolSize和maximumPoolSize的含义
2.要知道Worker的作用
3.要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
3.线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
注意上面传进去的参数是null,根据前面的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的r = workQueue.take();
即等待任务队列中有任务。
4.任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue,通常可以取下面三种类型:
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5.任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
运行结果如下:
从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
在java规范中,不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool();//创建一个可缓存线程池(缓冲池容量大小为Integer.MAX_VALUE),如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newFixedThreadPool(int); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
Executors.newScheduledThreadPool(int); // 创建一个定长线程池,支持定时及周期性任务执行。
Executors.newSingleThreadExecutor(); //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
4种方法的构造代码如下:
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newScheduledThreadPool创建的线程池corePoolSize为用户设置的值,maximumPoolSize值是Integer.MAX_VALUE;
newSingleThreadExecutor创建的线程池corePoolSize和maximumPoolSize都为1,它使用的是LinkedBlockingQueue。
newCachedThreadPool实例
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
newFixedThreadPool实例
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
newScheduledThreadPool实例
延迟执行示例代码:
表示延迟3秒执行。
定期执行示例代码:
表示延迟1秒后每3秒执行一次。
newSingleThreadExecutor实例
结果依次输出,相当于顺序执行各个任务。
对于包含 I/O 操作或者其它阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。参考值为2*N
通过以上的学习,相信大家对线程池有了一个深刻的理解,正所谓万事开头难,实践出真知,实践是检验真理的唯一标准,我相信大家只要好好阅读本篇文章,自己思考动手操作几个示例,那么java的线程池你一定会用的得心用手。
http://mp.blog.csdn.net/mdeditor/50687412
大家开发程序时,基本上都会使用线程去处理一些任务,我相信大家最通常的做法都是直接创建一个Thread然后在run方法里面去处理自己的逻辑,这种做法当然可以,但是大家考虑一下,如果你有很多任务需要处理怎么做呢,难道你每个任务都去创建一个Thread吗,这样思考一下,我们会想到能不能找到一个装载线程的容器,然后通过控制这个容器,就可以管理容器里面的线程,那么这个容器我们就叫做线程池。
本篇文章主要讲解以下4个方面
1.核心API介绍
2.ThreadPoolExecutor类的使用
3.线程池的实现原理
4.使用实例
5.如何合理配置线程池的大小
核心API介绍
1.Executor接口public void execute(Runnable runnable);
该接口声明了execute方法,将Runnable对象传入,启动该方法相当于启动了该线程
2.ExecutorService接口
该接口是Executor的子接口,提供了submit方法,该方法返回Future对象,Future可用于控制线程的执行与取消
3.ScheduledExecutorService接口
该接口是ExecutorService的子接口,提供了线程启动时机的控制
4.Callable接口
public Object call() throws Exception;
该接口类似Runnable接口,只是有返回值和抛出的异常,该方法返回的值就是ExecutorService接口调用submit(Callable call)方法返回的Future调用get方法返回的值。
5.Future接口
Future接口用于控制线程的执行。调用cancell(boolean bool)方法将会取消正在运行的进程。
6.Executors
这是一个线程池的工厂类,该类提供了许多生成Executor,ExecutorService,ScheduleExecutorService的工厂方法。
Executor就相当于一个线程池,提供对线程的管理功能,包括启动线程,取消线程执行,加入线 程,移除线程,以及协调线程之间的工作等功能,这些随着Executor级别(即子接口,实现类)越高而功能越强。
ThreadPoolExecutor类的使用
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面我们看一下前3个构造方法的源码:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
通过源码发现,前3个构造方法都是调用的第4个构造方法进行初始化的。
下面看一下构造方法中,各个参数的意义。
corePoolSize:池中所保存的线程数,包括空闲线程,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:池中允许的最大线程数
keepAliveTime:当线程数大于核心(corePoolSize)时,此为终止前多余的空闲线程等待新任务的最长时间。
unit:keepAliveTime 参数的时间单位。(下方7种取值)
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:执行前用于保持任务的队列。此队列仅保持由execute方法提交的Runnable任务。这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue 无界队列
LinkedBlockingQueue 有界队列
SynchronousQueue 工作队列的默认选项是SynchronousQueue
线程池的排队策略与BlockingQueue有关。
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
方法介绍:
public void execute(Runnable command)
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
public Future<T> submit(Runnable task)
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
public boolean prestartCoreThread()
启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。如果已启动所有核心线程,此方法将返回false。
public int prestartAllCoreThreads()
启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
public void allowCoreThreadTimeOut(boolean value)
如果允许核心线程超时,value为true,否则为false
线程池的实现原理
上一部分简单介绍了ThreadPoolExecutor类的使用,下面讲解线程池实现的原理主要从以下几个方面:线程池的状态
任务的执行
线程池中的线程初始化
任务缓存队列及排队策略
任务拒绝策略
线程池的关闭
线程池容量的动态调整
1.线程池的状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue workQueue;//任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态的改变都要使用这个锁
private final HashSet workers = new HashSet(); //用来存放工作集
private volatile boolean allowCoreThreadTimeOut;//是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
//执行的核心方法 public void execute(Runnable command) { //判断提交的任务command是否为null,若是null,则抛出空指针异常; if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
//如果当前线程池里的线程数量低于核心数量执行此方法 private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //创建线程去执行firstTask任务 } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //创建一个线程,执行任务 if (t != null) { w.thread = t; //将创建的线程的引用赋值为w的成员变量 workers.add(w); int nt = ++poolSize; //当前线程数加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据 //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //当任务队列中没有任务时,进行清理工作 } } }
它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:
Thread t = new Thread(w);
相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。
既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间, //则通过poll取任务,若等待一定的时间取不到任务,则返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中断处于空闲状态的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:
private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; //如果runState大于等于STOP,或者任务缓存队列为空了 //或者 允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1 try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:
void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //实际上调用的是worker的interruptIfIdle()方法 w.interruptIfIdle(); } finally { mainLock.unlock(); } }
从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中
void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的 //如果成功获取了锁,说明当前worker处于空闲状态 try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } }
这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。
我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize < maximumPoolSize不同而已。
总结:
1.要清楚corePoolSize和maximumPoolSize的含义
2.要知道Worker的作用
3.要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
3.线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意传进去的参数是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意传进去的参数是null ++n; return n; }
注意上面传进去的参数是null,根据前面的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的r = workQueue.take();
即等待任务队列中有任务。
4.任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue,通常可以取下面三种类型:
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5.任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
使用实例
public class Test2 { public static void main(String[] args) { ThreadPoolExecutor executor=new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask task=new MyTask(i); executor.execute(task); System.out.println("线程池中线程数量:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+executor.getQueue().size()+",已执行完的任务数目:"+executor.getCompletedTaskCount()); } executor.shutdown(); try { Thread.sleep(1000); System.out.println("执行完毕的任务数量:"+executor.getCompletedTaskCount()); } catch (InterruptedException e) { e.printStackTrace(); } } } class MyTask implements Runnable{ private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在执行task"+taskNum); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task"+taskNum+"执行完毕"); } }
运行结果如下:
正在执行task0 线程池中线程数量:1,队列中等待执行的任务数目:0,已执行完的任务数目:0 线程池中线程数量:2,队列中等待执行的任务数目:0,已执行完的任务数目:0 正在执行task1 线程池中线程数量:3,队列中等待执行的任务数目:0,已执行完的任务数目:0 正在执行task2 线程池中线程数量:4,队列中等待执行的任务数目:0,已执行完的任务数目:0 正在执行task3 线程池中线程数量:5,队列中等待执行的任务数目:0,已执行完的任务数目:0 线程池中线程数量:5,队列中等待执行的任务数目:1,已执行完的任务数目:0 正在执行task4 线程池中线程数量:5,队列中等待执行的任务数目:2,已执行完的任务数目:0 线程池中线程数量:5,队列中等待执行的任务数目:3,已执行完的任务数目:0 线程池中线程数量:5,队列中等待执行的任务数目:4,已执行完的任务数目:0 线程池中线程数量:5,队列中等待执行的任务数目:5,已执行完的任务数目:0 线程池中线程数量:6,队列中等待执行的任务数目:5,已执行完的任务数目:0 正在执行task10 线程池中线程数量:7,队列中等待执行的任务数目:5,已执行完的任务数目:0 正在执行task11 线程池中线程数量:8,队列中等待执行的任务数目:5,已执行完的任务数目:0 正在执行task12 线程池中线程数量:9,队列中等待执行的任务数目:5,已执行完的任务数目:0 正在执行task13 线程池中线程数量:10,队列中等待执行的任务数目:5,已执行完的任务数目:0 正在执行task14 task0执行完毕 正在执行task5 task1执行完毕 正在执行task6 task3执行完毕 task2执行完毕 task4执行完毕 正在执行task7 正在执行task8 正在执行task9 task11执行完毕 task10执行完毕 task12执行完毕 task14执行完毕 task13执行完毕 task5执行完毕 task6执行完毕 task7执行完毕 task8执行完毕 task9执行完毕 执行完毕的任务数量:15
从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
在java规范中,不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool();//创建一个可缓存线程池(缓冲池容量大小为Integer.MAX_VALUE),如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newFixedThreadPool(int); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
Executors.newScheduledThreadPool(int); // 创建一个定长线程池,支持定时及周期性任务执行。
Executors.newSingleThreadExecutor(); //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
4种方法的构造代码如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newScheduledThreadPool创建的线程池corePoolSize为用户设置的值,maximumPoolSize值是Integer.MAX_VALUE;
newSingleThreadExecutor创建的线程池corePoolSize和maximumPoolSize都为1,它使用的是LinkedBlockingQueue。
newCachedThreadPool实例
public class NewCachedThreadPoolTest { public static void main(String[] args) { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int index=i; try { Thread.sleep(i*1000); } catch (InterruptedException e) { e.printStackTrace(); } newCachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); } } }
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
newFixedThreadPool实例
public class FixedThreadPoolTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=0;i<10;i++){ final int index=i; executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); System.out.println(index+""); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
newScheduledThreadPool实例
延迟执行示例代码:
public class NewScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } }
表示延迟3秒执行。
定期执行示例代码:
public class NewScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("我叫岩仔"); } }, 1, 3, TimeUnit.SECONDS); } }
表示延迟1秒后每3秒执行一次。
newSingleThreadExecutor实例
public class NewSingleThreadExecutor { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); for(int i=0;i<10;i++){ final int index=i; executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); System.out.println(index); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
结果依次输出,相当于顺序执行各个任务。
如何合理配置线程池的大小
对于计算密集的任务,在拥有N个处理器的系统上,当线程池大小为 N+1 时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其它原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费。)对于包含 I/O 操作或者其它阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。参考值为2*N
通过以上的学习,相信大家对线程池有了一个深刻的理解,正所谓万事开头难,实践出真知,实践是检验真理的唯一标准,我相信大家只要好好阅读本篇文章,自己思考动手操作几个示例,那么java的线程池你一定会用的得心用手。
相关文章推荐
- Java核心技术学习笔记之二:Java运算符
- Java代码编译和执行的整个过程
- Java核心技术学习笔记之一:Java的基本程序设计结构
- 【实战Java高并发程序设计 5】让普通变量也享受原子操作
- Struts2-3.struts.xml的action可以简写
- Struts2 OGNL的一个例子
- Spring入门
- Java -Xms -Xmx -Xss -XX:MaxNewSize -XX:MaxPermSize含义记录
- 【练习】String类的trim方法
- The Java™ Tutorials — Concurrency :Executors
- eclipse git 整合
- JAVA线程池ThreadPoolExecutor的简单使用
- java读取本地properties文件
- java去除字符串中的空格、回车、换行符、制表符
- springmvc 带查询条件的分页,form的控制范围,怎么包裹条件提交给后台
- Java IO最详解
- JavaScriptCore详解
- struts1重定向 浅谈ActionForward的三种重定向
- Java垃圾回收机制
- 【实战Java高并发程序设计5】让普通变量也享受原子操作