您的位置:首页 > 其它

如果线程池内线程执行抛出异常了怎么办?

2020-04-22 12:43 120 查看

线程池正常运行的情况下,如果忽然某条线程执行任务时,抛出了异常,这个时候线程池会怎么处理呢?
准备测试代码(测试代码需要jdk1.8以上)

public class ThreadThrowsExceptionTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
executorService.submit(() -> {
System.out.println("线程" + Thread.currentThread().getName() + "准备抛空指针异常了");
throw new NullPointerException("线程抛出空指针啦");
});
IntStream.range(0, 4).forEach(index -> executorService.submit(() -> System.out.println("线程" + Thread.currentThread().getName() + "执行了任务" + index)));

}

//这里借用了Executors中DefaultThreadFactory,进行了简单修改,方便测试
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
System.out.println("创建线程了" + t.getName());
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

}

这里的DefaultThreadFactory借用了Executors中DefaultThreadFactory的源码,进行了简单修改,方便我们调试时查看创建的线程信息。例子中提交了4个正常任务,1个抛异常的任务。执行情况如下

创建线程了pool-1-thread-1
线程pool-1-thread-1准备抛空指针异常了
创建线程了pool-1-thread-2
创建线程了pool-1-thread-3
创建线程了pool-1-thread-4
线程pool-1-thread-2执行了任务0
线程pool-1-thread-3执行了任务1
创建线程了pool-1-thread-5
线程pool-1-thread-4执行了任务2
线程pool-1-thread-5执行了任务3

线程池创建了5条工作线程,正常执行了5个任务,其中一个抛出异常也没有影响其他工作线程,似乎线程池没有进行任何处理。
由于Future获取异常的时机是在调用get方法时,这里需要我们稍稍改下代码

public class ThreadThrowsExceptionTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
60L, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//这里改为调用execute
executorService.execute(() -> {
System.out.println("线程" + Thread.currentThread().getName() + "准备抛空指针异常了");
throw new NullPointerException("线程抛出空指针啦");
});
IntStream.range(0, 4).forEach(index -> executorService.submit(() -> System.out.println("线程" + Thread.currentThread().getName() + "执行了任务" + index)));

}

...

}

执行结果如下

创建线程了pool-1-thread-1
线程pool-1-thread-1准备抛空指针异常了
创建线程了pool-1-thread-2
Exception in thread "pool-1-thread-1" java.lang.NullPointerException: 线程抛出空指针啦
at com.example.homework.thread.ThreadThrowsExceptionTest.lambda$main$0(ThreadThrowsExceptionTest.java:20)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
创建线程了pool-1-thread-3
创建线程了pool-1-thread-4
创建线程了pool-1-thread-5
线程pool-1-thread-4执行了任务1
线程pool-1-thread-3执行了任务0
创建线程了pool-1-thread-6
线程pool-1-thread-5执行了任务2
线程pool-1-thread-6执行了任务3

这里出现了pool-1-thread-6!看来线程池在线程执行异常后新创建了一条线程。
那么具体原因需要跟踪源码看一下

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
//这里调用的是execute方法
execute(ftask);
return ftask;
}

由于submit方法中也是调用execute方法,所以我们只需要关注execute方法实现,execute方法细节如下

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//添加线程的主要方法
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//这里通过Worker包装task
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//线程开始执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这里没有对执行的情况做处理,那就只有继续跟踪Worker的代码

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker  */
public void run() {
//执行线程的关键方法
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

主要执行的细节方法在runWorker中,而对于发生异常的处理有两个地方afterExecute,processWorkerExit。afterExecute是留给子类实现的,processWorkerExit的源码如下

/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit.  This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果发生异常,completedAbruptly值为初始化值true,这里会减少目前的工作线程数,目的是为了排除当前发生异常的线程
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//这里会新增一个工作线程
addWorker(null, false);
}
}

至此终于线程池的发生异常的处理过程大致就清楚了,从此例可以看出,ThreadPoolExecutor默认使用的是懒加载的方式,没有一开始就初始化指定核心线程数量的线程。而Worker同时也是装饰模式的一种应用,给Tread增加了一些额外的处理逻辑,使其使用起来更加灵活。

  • 点赞
  • 收藏
  • 分享
  • 文章举报
George.Lee 发布了5 篇原创文章 · 获赞 0 · 访问量 453 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: