您的位置:首页 > 编程语言 > Java开发

ThreadPoolExecutor核心实现原理和源码解析<二>

2017-08-27 14:44 931 查看
Worker类实现了Runnable,同时也扩展了AbstractQueuedSynchronizer,说明Worker本身不只是一个可执行的任务,还可以实现锁的功能。其主要作用是执行队列中的任务,并负责管理工作线程和维护一些统计指标,如已完成的任务数量等等;同时Worker通过扩展AbstractQueuedSynchronizer来简化任务执行时获取锁与释放锁的操作。Worker中的锁主要是为了防止中断在运行任务中的工作线程,中断仅用于唤醒在等待从workQueue中获取任务的线程。

如何防止被中断?

worker实现了一个简单的不可重入互斥锁,工作线程执行任务时,首先会进行加锁,如果主线程想要中断当前工作线程,需要先获取锁,否则无法中断。当工作线程执行完任务则会释放锁,并调用getTask从workQueue获取任务继续执行。由此可知,只有在等待从workQueue中获取任务(调用getTask期间)时才能中断。工作线程接收到中断信息,并不会立即就会停止,而是会检查workQueue是否为空,不为空则还是会继续获取任务执行,只有队列为空才会被停止。因此中断是为了停止空闲线程,也就是那些从任务队列获取任务被阻塞(任务队列为空)的线程。后续会详细分析整个过程。

为什么Worker被设计为不可重入?

这就需要知道那些操作可能会发生中断工作线程的操作。目前主要有以下几个:

setCorePoolSize();

setMaximumPoolSize();

setKeppAliveTime();

allowCoreThreadTimeOut();

shutdown();

tryTerminate();

如果锁可以重入,调用诸如setCorePoolSize等线程池控制方法时可以再次获取锁,那么可能会导致调用线程池控制方法期间中断正在运行的工作线程。jdk不希望在调用像setCorePoolSize这样的池控制方法时重新获取锁。

Worker源码如下:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* 该类实际绝不会被序列化,提供serialVersionUID主要为了屏蔽javac warning
*/
private static final long serialVersionUID = 6138294804551838833L;

/** 运行在Worker对象中的线程 */
final Thread thread;
/** 要运行的初始任务,可能为null */
Runnable firstTask;
/** 每个线程的任务计数器,使用volatile保证可见性 */
volatile long completedTasks;

/**
* 使用指定的初始任务和ThreadFactory中的线程对象创建一个Worker
*/
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** 将主运行循环委托给外部的runWorker  */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() == 1;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}


核心函数 runWorker

 runWorker流程图:



runWorker会不断从工作队列表中取任务并执行;同时runWorker也会管理线程的中断状态,源码如下:

final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;//是否“突然完成”,非正常完成
try {
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

主要步骤:

1 从初始任务开始执行,如果firstTask 为null,只要线程池在运行,调用getTask从队列中取任务来执行。如果getTask返回null,则worker可能由于线程池状态调整或参数动态调整导致退出。若外部代码中抛出异常导致worker退出,completedAbruptly将为true,则在processWorkerExit将创建新的worker替代。

2 执行任务前,对worker加锁,已防止在任务运行时,线程池中其他操作中断当前worker。调用clearInterruptsForTaskRun管理线程中断状态,首先看看源码:

private void clearInterruptsForTaskRun() {
if (runStateLessThan(ctl.get(), STOP) &&
Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))
Thread.currentThread().interrupt();
}

这个方法调用非常重要,当线程池状态小于STOP,调用Thread.interrupted(),如果getTask期间设置了worker的中断状态,则返回true,同时Thread.interrupted()将清除中断状态,即再次调用将返回false;再次检查线程池状态,如果状态大于或等于STOP,则需要调用Thread.currentThread().interrupt()恢复线程的中断状态。因此,该方法有两个作用:

<一>:当线程池仍然在运行时,若其他操作中断了worker,则该操作将清除中断状态

<二>:清除中断状态后,再次检查线程池状态,如果状态大于或等于STOP,此时需要恢复线程的中断状态,这样在下次调用getTask将返回null,worker将正常退出。

3 每个任务执行前,调用beforeExecute,beforeExecute可能抛出异常,该情况下抛出的异常会导致任务未执行worker就死亡,没有使用catch处理,会向上抛跳出循环,且completedAbruptly==true。

4 beforeExecute正常完成则开始运行任务,并收集其抛出的任何异常以发送到afterExecute,这里将分别处理分别处理RuntimeException,Error和任意Throwables,由于不能在Runnable.run内重新抛出Throwables,因此将Throwable包装为Error(到线程的UncaughtExceptionHandler中处理)向上抛。任何向上抛的异常都将导致线程死亡,completedAbruptly仍然为true。

5 任务执行完成后,调用afterExecute,该方法同样可能抛出异常,并导致线程死亡。

获取任务

runWorker运行期间,将不断调用getTask()从任务队列中取任务来执行。

getTask方法流程图如下:



源码如下:

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/**
* 外层循环
* 用于检查线程池状态和工作队列是否为空
*/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 调用了shutdownNow()或调用了shutdown()且workQueue为空,返回true
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

boolean timed;      // Are workers subject to culling?
/**
* 内层循环
* 用于检测工作线程数量和获取task的超时状态
*/
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;

if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

任务队列为空时,getTask()会根据当前线程池的配置执行阻塞或定时等待任务,当发生以下条件时,将返回null:

1 工作线程的数量超过maximumPoolSize

2 线程池已经停止

3 线程池调用了shutdown且任务队列为空

4 工作线程等待一个任务超时,且allowCoreThreadTimeOut || workerCount > corePoolSize返回true。

工作线程退出

runWorker中,当getTask返回null或抛出异常,将进入processWorkerExit处理工作线程的退出。

processWorkerExit方法流程图如下:



下面看看源码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 如果是突然终止,工作线程数减1
* 如果不是突然终止,在getTask()中已经减1
*/
if (completedAbruptly)
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//锁定线程池
try {
completedTaskCount += w.completedTasks;//汇总完成的任务数量
workers.remove(w);//移除工作线程
} finally {
mainLock.unlock();
}

tryTerminate();//尝试终止线程池

int c = ctl.get();
//状态是running、shutdown,即tryTerminate()没有成功终止线程池
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//任务队列中仍然有任务未执行,需至少保证有一个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
/**
* allowCoreThreadTimeOut为false则需要保证线程池中至少有corePoolSize数量的工作线程
*/
if (workerCountOf(c) >= min)
return;
}
//添加一个没有firstTask的工作线程
addWorker(null, false);
}
}

processWorkerExit只会在工作线程中被调用,主要用于清理和记录一个即将死亡的线程,该方法可能会终止线程池。这里不再详细tryTerminate和addWorker的实现,关于tryTerminate和addWorker的分析参见ThreadPoolExecutor核心实现原理和源码解析<一>

欢迎指出本文有误的地方,转载请注明原文出处。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息