如果线程池内线程执行抛出异常了怎么办?
2020-04-22 12:43
120 查看
线程池正常运行的情况下,如果忽然某条线程执行任务时,抛出了异常,这个时候线程池会怎么处理呢?
准备测试代码(测试代码需要jdk1.8以上)
public class ThreadThrowsExceptionTest { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); executorService.submit(() -> { System.out.println("线程" + Thread.currentThread().getName() + "准备抛空指针异常了"); throw new NullPointerException("线程抛出空指针啦"); }); IntStream.range(0, 4).forEach(index -> executorService.submit(() -> System.out.println("线程" + Thread.currentThread().getName() + "执行了任务" + index))); } //这里借用了Executors中DefaultThreadFactory,进行了简单修改,方便测试 static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); System.out.println("创建线程了" + t.getName()); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }
这里的DefaultThreadFactory借用了Executors中DefaultThreadFactory的源码,进行了简单修改,方便我们调试时查看创建的线程信息。例子中提交了4个正常任务,1个抛异常的任务。执行情况如下
创建线程了pool-1-thread-1 线程pool-1-thread-1准备抛空指针异常了 创建线程了pool-1-thread-2 创建线程了pool-1-thread-3 创建线程了pool-1-thread-4 线程pool-1-thread-2执行了任务0 线程pool-1-thread-3执行了任务1 创建线程了pool-1-thread-5 线程pool-1-thread-4执行了任务2 线程pool-1-thread-5执行了任务3
线程池创建了5条工作线程,正常执行了5个任务,其中一个抛出异常也没有影响其他工作线程,似乎线程池没有进行任何处理。
由于Future获取异常的时机是在调用get方法时,这里需要我们稍稍改下代码
public class ThreadThrowsExceptionTest { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 60L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //这里改为调用execute executorService.execute(() -> { System.out.println("线程" + Thread.currentThread().getName() + "准备抛空指针异常了"); throw new NullPointerException("线程抛出空指针啦"); }); IntStream.range(0, 4).forEach(index -> executorService.submit(() -> System.out.println("线程" + Thread.currentThread().getName() + "执行了任务" + index))); } ... }
执行结果如下
创建线程了pool-1-thread-1 线程pool-1-thread-1准备抛空指针异常了 创建线程了pool-1-thread-2 Exception in thread "pool-1-thread-1" java.lang.NullPointerException: 线程抛出空指针啦 at com.example.homework.thread.ThreadThrowsExceptionTest.lambda$main$0(ThreadThrowsExceptionTest.java:20) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 创建线程了pool-1-thread-3 创建线程了pool-1-thread-4 创建线程了pool-1-thread-5 线程pool-1-thread-4执行了任务1 线程pool-1-thread-3执行了任务0 创建线程了pool-1-thread-6 线程pool-1-thread-5执行了任务2 线程pool-1-thread-6执行了任务3
这里出现了pool-1-thread-6!看来线程池在线程执行异常后新创建了一条线程。
那么具体原因需要跟踪源码看一下
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); //这里调用的是execute方法 execute(ftask); return ftask; }
由于submit方法中也是调用execute方法,所以我们只需要关注execute方法实现,execute方法细节如下
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //添加线程的主要方法 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //这里通过Worker包装task w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //线程开始执行 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这里没有对执行的情况做处理,那就只有继续跟踪Worker的代码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { //执行线程的关键方法 runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
主要执行的细节方法在runWorker中,而对于发生异常的处理有两个地方afterExecute,processWorkerExit。afterExecute是留给子类实现的,processWorkerExit的源码如下
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果发生异常,completedAbruptly值为初始化值true,这里会减少目前的工作线程数,目的是为了排除当前发生异常的线程 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //这里会新增一个工作线程 addWorker(null, false); } }
至此终于线程池的发生异常的处理过程大致就清楚了,从此例可以看出,ThreadPoolExecutor默认使用的是懒加载的方式,没有一开始就初始化指定核心线程数量的线程。而Worker同时也是装饰模式的一种应用,给Tread增加了一些额外的处理逻辑,使其使用起来更加灵活。
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- 求解,多线程时,线程池中一个线程内部代码抛出异常,那么这个线程能正常结束么?
- c# Task编程一个task抛出异常后怎么取消其他线程
- 如果同步块内的线程抛出异常会发生什么?
- 线程池中捕获线程执行异常
- 如果同步块内的线程抛出异常会发生什么?
- 多线程执行时,如果一个逻辑需要等若干个线程执行完成后再执行,怎么实现?
- 如果同步块内的线程抛出异常会发生什么?
- 【捕获Java线程池执行任务抛出的异常】
- 如果同步块内的线程抛出异常会发生什么?
- 捕获Java线程池执行任务抛出的异常
- 如果同步块内的线程抛出异常会发生什么?
- 主线程捕捉线程池中线程抛出的异常
- 如果同步块内的线程抛出异常会发生什么?
- 如果同步块内的线程抛出异常会发生什么?
- 在工作线程中调用UpdateData函数怎么抛出异常呢
- 如果同步块内的线程抛出异常会发生什么?
- 如果同步块内的线程抛出异常会发生什么?
- 捕获Java线程池执行任务抛出的异常
- 如果catch没有异常抛出怎么办
- 捕获Java线程池执行任务抛出的异常