您的位置:首页 > 其它

ThreadPoolExecutor核心实现原理和源码解析<一>

2017-05-02 09:37 806 查看
摘要: 本文将详细分析ThreadPoolExecutor的实现原理,并结合源码介绍ThreadPoolExecutor的重要操作,对理解ThreadPoolExecutor非常有帮助。本文中源码基于JDK1.7

前面的文章已经详细分析了Executor框架及其家族的各个成员,为介绍本文做了铺垫,因此分析ThreadPoolExecutor核心实现原理可谓千呼万唤使出来啊,直奔主题吧!

首先从ThreadPoolExecutor的构造方法开始。

构造方法

//...省略其他三个版本的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造器中各个参数的含义:

corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到工作队列当中。只有当工作队列满了的情况下才会创建超出这个数量的线程。如果某个线程的空闲时间超过了活动时间,那么将标记为可回收,并且只有当线程池的当前大小超过corePoolSize时该线程才会被终止。用户可调用prestartAllCoreThreads()或者prestartCoreThread()方法预先创建线程,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;当池中线程数大于了这个值就会将新到的任务由一个丢弃处理机制来处理。

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

Unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。

workQueue:一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,也就是参数maximumPoolSize达到后丢弃处理的方法。有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

用户也可以实现接口RejectedExecutionHandler定制自己的策略。

下面将深入剖析线程池的实现原理:

线程池状态

JDK1.7中使用原子变量ctl来控制线程池的状态,其中ctl包装了以下两个field:

workerCount:表示有效的线程数
runState:表示线程池的状态,是否运行,关闭等

由于workerCount和runState被保存在一个int中,因此workerCount限制为(2 ^ 29)-1(约5亿)线程。其使用shift / mask常数来计算workerCount和runState的值。源码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

workerCount是已经启动但没有停止的worker线程数量。

runState用于控制线程池的生命周期状态,主要包含以下几个值:

RUNNING接收新任务,并且处理任务队列中的任务,当创建线程池后,初始时,线程池处于RUNNING状态
SHUTDOWN不接收新任务,但是处理任务队列的任务
STOP不接收新任务,不处理任务队列,同时中断所有进行中的工作线程
TIDYING所有任务已经被终止,工作线程数量为 0,到达该状态会执行terminated()
TERMINATEDterminated()已经完成
各状态的转换关系:

RUNNING -> SHUTDOWN:shutdown()被调用

(RUNNING or SHUTDOWN) -> STOP:shutdownNow()被调用

SHUTDOWN -> TIDYING:队列和池均为空

STOP -> TIDYING:池为空

TIDYING -> TERMINATED:钩子方法terminated()已经完成。



当线程池状态为TERMINATED时,调用awaitTermination()的线程将从等待中返回。

工作线程

考虑到将Worker实现分析加入本文将导致文章太长,不宜阅读,关于Worker的核心实现以及ThreadPoolExecutor的核心方法runWorker、getTask和processWorkerExit的功能分析和源码解读请参见ThreadPoolExecutor核心实现原理和源码解析<二>

添加工作线程

addWorker用于添加并启动工作线程,先看其流程图:



源码解析如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

/** 这里返回false有以下可能:
* 1 线程池状态大于SHUTDOWN
* 2 线程池状态为SHUTDOWN,但firstTask不为空,也就是说线程池已经SHUTDOWN,拒绝添加新任务
* 3 线程池状态为SHUTDOWN且firstTask为空,但workQueue为空,即无任务需要执行
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
/** 返回false有以下可能:
* 1 工作线程数量超过最大容量
* 2 core为true,工作线程数量超过边界corePoolSize
* 3 core为false,工作线程数量超过边界maximumPoolSize
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;//直接跳出最外层循环
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)//线程池状态发生改变则从最外层循环重新开始
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

Worker w = new Worker(firstTask);
Thread t = w.thread;

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 持有锁之后需要重新检查线程池状态,防止ThreadFactory返回失败或线程池在加锁之前被关闭
int c = ctl.get();
int rs = runStateOf(c);
/** 返回false有以下可能:
* 1 t为null,说明ThreadFactory创建线程失败,可能发生OutOfMemoryError
* 2 线程池状态大于SHUTDOWN
* 3 线程池状态为SHUTDOWN,但firstTask不为空
*/
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}

workers.add(w);

int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}

t.start();
// 在线程池变为stop期间,线程可能已经被添加到workers,但还未被启动(该现象不太可能发生,这可能
// 导致罕见的丢失中断,因为Thread.interrupt不能保证对非启动状态的线程有效
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();

return true;
}

addWorker首先会检查当前线程池的状态和给定的边界是否可以创建一个新的worker,在此期间会对workers的数量进行适当调整;如果满足条件,将创建一个新的worker并启动,以参数中的firstTask作为worker的第一个任务。

任务的执行

ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,使用submit()提交任务最终调用的也是execute()方法,先看看流程图:



源码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 分3步处理:
*
* 1. 当前工作线程数 < corePoolSize,直接创建新的工作线程执行任务(调用addWorker)
*
* 2. 当前工作线程数 >=corePoolSize,线程池状态为RUNNING,且任务加入工作队列成功,
* 再次检查线程池当前状态是否处于RUNNING,如果不是,从队列移除任务,移除成功则拒绝任务
* 如果为RUNNING,判断当前工作线程数量是否为 0,如果为 0,就增加一个工作线程
*
* 3. 线程池状态不是RUNNING或任务入队失败,尝试开启普通线程执行任务,失败就拒绝该任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

从上面的分析可以总结出线程池运行任务的四个阶段:

poolSize < corePoolSize
且队列为空,此时会新建线程来处理提交的任务

poolSize == corePoolSize
,提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。

poolSize == corePoolSize
,并且工作队列已满,此时也会新建线程来处理提交的任务,但是
poolSize < maxPoolSize


poolSize == maxPoolSize
,并且队列已满,此时会触发拒绝策略。

当再次检查线程池当前状态不是RUNNING时,不仅从任务队列移除任务,同时会尝试终止线程池。

public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}


终止线程池

tryTerminate在很多地方都有调用,那么这个方法作用是什么呢?

场景分析:当调用线程池的shutDown()方法后,会调用interruptIdleWorkers尝试中断工作线程,而工作线程只有在getTask()期间才会有被中断的机会。假设interruptIdleWorkers成功设置的多个线程的中断状态,若此时任务队列非空,由于线程池状态为SHUTDOWN,getTask()将会从任务队列成功获取到任务;在runWorker执行任务时,线程池状态为SHUTDOWN(小于STOP),那么当前工作线程的中断状态将会被清除。当中断状态被清除后,从工作队列取任务将不会响应中断,直到工作队列为空,此时之前被成功设置中断状态的工作线程都可能会阻塞在workQueue.take(),由于SHUTDOWN状态的线程池不会接收新任务,工作线程将一直阻塞下去,永不会退出。怎么办呢?tryTerminate这时将派上用场,Doug Lea大神巧妙的在所有可能导致线程池产终止的地方安插了tryTerminated()尝试线程池终止的逻辑,由tryTerminated来终止空闲的线程,直到无空闲线程,然后终止线程池。

下面看看tryTerminated的具体实现:

final void tryTerminate() {
for (;;) {
int c = ctl.get();
//由之前的状态转换可知,RUNNING不能直接跳到TERMINATED,因此返回
//状态已经为TERMINATED,无需再调用terminated(),返回
//状态为SHUTDOWN且队列不空,队列中的任务仍需要处理,不能调用terminated(),返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 符合终止条件
interruptIdleWorkers(ONLY_ONE);//一次仅中断一个线程
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));//将状态设为TERMINATED,且设置workerCount为0
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
//中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

由源码可知,以下情况将线程池变为TERMINATED终止状态:

1 线程池状态为SHUTDOWN,并且线程池中工作线程数量为0,工作队列为空

2 线程池状态为STOP,并且线程池中工作线程数量为0

关闭线程池

可使用shutdown()和shutdownNow()关闭线程池,但是效果和实现方法各不相同;同时也可调用awaitTermination(long timeout, TimeUnit unit)等待线程池终止。理解关闭线程池逻辑可能需要参照文章https://my.oschina.net/7001/blog/889770中介绍的runWorker和getTask()逻辑。

shutdown()

使用shutdown关闭线程池时,之前提交的任务都会被执行完成,但是拒绝接收新任务。shutdown不会等待之前提交的任务执行结束,该情况下可以使用awaitTermination()。源码如下:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//权限校验
advanceRunState(SHUTDOWN);
interruptIdleWorkers();//中断所有空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
//更新线程池状态为SHUTDOWN,使用自旋保证完成
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

本文不止一次提到过空闲线程,那么线程池中什么才是空闲线程?

空闲worker:正在从workQueue阻塞队列中获取任务的worker;

运行中worker:正在使用runWorker执行任务的worker。

阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务。继续进入自旋操作,此时线程池已经是shutdown状态,且workQueue.isEmpty(),getTask()返回null,进入worker退出逻辑。

shutdownNow()

shutdownNow表示立即关闭线程池,它会尝试停止所有活动的正在执行的任务,并停止处理任务队列中的任务,该方法将返回正在等待被执行的任务列表。shutdownNow尽力尝试停止运行中的任务,没有任何保证。取消任务是通过Thread.interrupt()发出中断信号来实现的。由runWorker源码可知,已经进入加锁区的任务并不会响应中断,因此只有工作线程执行完当前任务,进入getTask()才会感知线程池状态为STOP,开始处理退出逻辑。

shutdownNow对所有线程立即发出中断信号是为了阻止从任务队列取任务,让这些线程尽快进入退出逻辑;而那些正在执行runWorker加锁区中代码的线程,将在执行完当前任务后立即检测到线程池的状态,进入退出逻辑。源码如下:

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//将线程池状态修改为STOP
interruptWorkers();//中断所有工作线程
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* 使用drainTo方法一次性将工作队列中的任务加入taskList ,并从工作队列移除;
* 如果队列是DelayQueue或任何其他类型的队列,poll或drainTo可能无法删除某些元素,则会逐个删除它们
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

下面看看interruptWorkers的具体实现:

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
try {
w.thread.interrupt();
} catch (SecurityException ignore) {
}
}
} finally {
mainLock.unlock();
}
}

比较interruptIdleWorkers源码可知,interruptWorkers不需要等待持有Worker上的锁才中断线程,调用interruptWorkers会立即中断所有工作线程,interruptIdleWorkers则需要首先持有Worker上的锁才能进行中断。interruptWorkers目前只用在shutdownNow中。

awaitTermination

awaitTermination()会循环线程池是否terminated或是否已经超过超时时间,每次判断不满足条件则使用Condition对象termination阻塞指定时间。termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待。调用shutdown之后,在以下情况发生之前,awaitTermination()都会被阻塞:

1 所有任务正常完成,线程池正常变为TERMINATED

2 任务仍未完成,到达超时时间

3 当前线程被中断

阻塞等待过程中发生以下具体情况会解除阻塞:

1 任务正常完成,线程池正常变为TERMINATED,此时会调用 termination.signalAll()唤醒所有阻塞等待的线程

2 到达超时时间,nanos <= 0条件满足,返回false

3 当前线程被中断,termination.awaitNanos()将从阻塞中唤醒,并向上抛出InterruptException异常。

源码如下:

public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);//将超时时限分片
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=TERMINATED
return true;
if (nanos <= 0)//达到超时时间
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}


常用的几种线程池

通常情况下,我们使用Executors的静态工厂方法来创建线程池,下面看创建几种常用线程池的方法:

/** newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务就创建一个线程,
* 直到达到线程池的最大数量,这时线程池的规模不再变化
* (如果某个线程由于发生未预期的Exception而终止,线程池将补充一个新线程)
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/** newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,
* 那么将回收空闲的线程,当需求增加时,则可以创建另外一个线程,线程池规模不存在任何限制
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/** newSingleThreadExecutor是一个单线程的Executor,只创建一个线程来执行任务,
* 如果线程异常结束,会创建新的线程来替代。
* newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(如:FIFO,LIFO,优先级)
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* newScheduledThreadPool创建一个固定长度的线程池,而且以延时或定时的方式来执行任务
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

欢迎指出本文有误的地方,若对您有帮助,点个赞支持一下呗!转载请注明原文出处

https://my.oschina.net/7001/blog/889931
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息