您的位置:首页 > 其它

ThreadPoolExecutor源码_线程池的corePoolSize、maximumPoolSize和poolSize

2018-01-12 19:21 471 查看
我们知道,受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,因为每一台机器允许的最大线程是一个有界值。也就是说ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程,去执行提交的任务。然而对于多用户、高并发的应用来说,提交的任务数量非常巨大,一定会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。J.U.C提供的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。

 任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。

corePoolSize:

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。

maximumPoolSize:

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()改变运行的最大线程的数目。

poolSize:

线程池中当前线程的数量,当该值为0的时候,意味着没有任何线程,线程池会终止;同一时刻,poolSize不会超过maximumPoolSize。

现在我们通过ThreadPoolExecutor.execute()方法,看一下这3个属性的关系,以及线程池如何处理新提交的任务。以下源码基于JDK1.6.0_37版本。

[java] view
plain copy

public void execute(Runnable command) {  

    if (command == null)  

        throw new NullPointerException();  

    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  

        if (runState == RUNNING && workQueue.offer(command)) {  

            if (runState != RUNNING || poolSize == 0)  

                ensureQueuedTaskHandled(command);  

        }  

        else if (!addIfUnderMaximumPoolSize(command))  

            reject(command); // is shutdown or saturated  

    }  

}  

  

  

private boolean addIfUnderCorePoolSize(Runnable firstTask) {  

    Thread t = null;  

    final ReentrantLock mainLock = this.mainLock;  

    mainLock.lock();  

    try {  

        if (poolSize < corePoolSize && runState == RUNNING)  

            t = addThread(firstTask);  

    } finally {  

        mainLock.unlock();  

    }  

    if (t == null)  

        return false;  

    t.start();  

    return true;  

}  

  

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {  

    Thread t = null;  

    final ReentrantLock mainLock = this.mainLock;  

    mainLock.lock();  

    try {  

        if (poolSize < maximumPoolSize && runState == RUNNING)  

            t = addThread(firstTask);  

    } finally {  

        mainLock.unlock();  

    }  

    if (t == null)  

        return false;  

    t.start();  

    return true;  

}  

新提交一个任务时的处理流程很明显:

1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;

2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);

3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;

4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

饱和策略RejectedExecutionHandler

接下来我们看下allowCoreThreadTimeOut和keepAliveTime属性的含义。在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过corePoolSize和maximumPoolSize,控制如何新增线程;通过allowCoreThreadTimeOut和keepAliveTime,控制如何销毁线程。

allowCoreThreadTimeOut:

该属性用来控制是否允许核心线程超时退出。If
false,core threads stay alive even when idle.If true, core threads use keepAliveTime to time out waiting for work。如果线程池的大小已经达到了corePoolSize,不管有没有任务需要执行,线程池都会保证这些核心线程处于存活状态。可以知道:该属性只是用来控制核心线程的。

keepAliveTime:

如果一个线程处在空闲状态的时间超过了该属性值,就会因为超时而退出。举个例子,如果线程池的核心大小corePoolSize=5,而当前大小poolSize =8,那么超出核心大小的线程,会按照keepAliveTime的值判断是否会超时退出。如果线程池的核心大小corePoolSize=5,而当前大小poolSize
=5,那么线程池中所有线程都是核心线程,这个时候线程是否会退出,取决于allowCoreThreadTimeOut。

[java] view
plain copy

Runnable getTask() {  

        for (;;) {  

            try {  

                int state = runState;  

                if (state > SHUTDOWN)  

                    return null;  

                Runnable r;  

                if (state == SHUTDOWN)  // Help drain queue  

                    r = workQueue.poll();  

                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  

                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  

                else  

                    r = workQueue.take();  

                if (r != null)  

                    return r;  

                if (workerCanExit()) {  

                    if (runState >= SHUTDOWN) // Wake up others  

                        interruptIdleWorkers();  

                    return null;  

                }  

                // Else retry  

            } catch (InterruptedException ie) {  

                // On interruption, re-check runState  

            }  

        }  

    }  

(poolSize > corePoolSize || allowCoreThreadTimeOut)这个条件,就是用来判断是否允许当前线程退出。workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);就是借助阻塞队列,让空闲线程等待keepAliveTime时间之后,恢复执行。这样空闲线程会由于超时而退出。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池 源码 线程