ThreadPoolExecutor核心实现原理和源码解析<二>
2017-08-27 14:44
931 查看
Worker类实现了Runnable,同时也扩展了AbstractQueuedSynchronizer,说明Worker本身不只是一个可执行的任务,还可以实现锁的功能。其主要作用是执行队列中的任务,并负责管理工作线程和维护一些统计指标,如已完成的任务数量等等;同时Worker通过扩展AbstractQueuedSynchronizer来简化任务执行时获取锁与释放锁的操作。Worker中的锁主要是为了防止中断在运行任务中的工作线程,中断仅用于唤醒在等待从workQueue中获取任务的线程。
如何防止被中断?
worker实现了一个简单的不可重入互斥锁,工作线程执行任务时,首先会进行加锁,如果主线程想要中断当前工作线程,需要先获取锁,否则无法中断。当工作线程执行完任务则会释放锁,并调用getTask从workQueue获取任务继续执行。由此可知,只有在等待从workQueue中获取任务(调用getTask期间)时才能中断。工作线程接收到中断信息,并不会立即就会停止,而是会检查workQueue是否为空,不为空则还是会继续获取任务执行,只有队列为空才会被停止。因此中断是为了停止空闲线程,也就是那些从任务队列获取任务被阻塞(任务队列为空)的线程。后续会详细分析整个过程。
为什么Worker被设计为不可重入?
这就需要知道那些操作可能会发生中断工作线程的操作。目前主要有以下几个:
setCorePoolSize();
setMaximumPoolSize();
setKeppAliveTime();
allowCoreThreadTimeOut();
shutdown();
tryTerminate();
如果锁可以重入,调用诸如setCorePoolSize等线程池控制方法时可以再次获取锁,那么可能会导致调用线程池控制方法期间中断正在运行的工作线程。jdk不希望在调用像setCorePoolSize这样的池控制方法时重新获取锁。
Worker源码如下:
runWorker会不断从工作队列表中取任务并执行;同时runWorker也会管理线程的中断状态,源码如下:
主要步骤:
1 从初始任务开始执行,如果firstTask 为null,只要线程池在运行,调用getTask从队列中取任务来执行。如果getTask返回null,则worker可能由于线程池状态调整或参数动态调整导致退出。若外部代码中抛出异常导致worker退出,completedAbruptly将为true,则在processWorkerExit将创建新的worker替代。
2 执行任务前,对worker加锁,已防止在任务运行时,线程池中其他操作中断当前worker。调用clearInterruptsForTaskRun管理线程中断状态,首先看看源码:
这个方法调用非常重要,当线程池状态小于STOP,调用Thread.interrupted(),如果getTask期间设置了worker的中断状态,则返回true,同时Thread.interrupted()将清除中断状态,即再次调用将返回false;再次检查线程池状态,如果状态大于或等于STOP,则需要调用Thread.currentThread().interrupt()恢复线程的中断状态。因此,该方法有两个作用:
<一>:当线程池仍然在运行时,若其他操作中断了worker,则该操作将清除中断状态
<二>:清除中断状态后,再次检查线程池状态,如果状态大于或等于STOP,此时需要恢复线程的中断状态,这样在下次调用getTask将返回null,worker将正常退出。
3 每个任务执行前,调用beforeExecute,beforeExecute可能抛出异常,该情况下抛出的异常会导致任务未执行worker就死亡,没有使用catch处理,会向上抛跳出循环,且completedAbruptly==true。
4 beforeExecute正常完成则开始运行任务,并收集其抛出的任何异常以发送到afterExecute,这里将分别处理分别处理RuntimeException,Error和任意Throwables,由于不能在Runnable.run内重新抛出Throwables,因此将Throwable包装为Error(到线程的UncaughtExceptionHandler中处理)向上抛。任何向上抛的异常都将导致线程死亡,completedAbruptly仍然为true。
5 任务执行完成后,调用afterExecute,该方法同样可能抛出异常,并导致线程死亡。
getTask方法流程图如下:
源码如下:
任务队列为空时,getTask()会根据当前线程池的配置执行阻塞或定时等待任务,当发生以下条件时,将返回null:
1 工作线程的数量超过maximumPoolSize
2 线程池已经停止
3 线程池调用了shutdown且任务队列为空
4 工作线程等待一个任务超时,且allowCoreThreadTimeOut || workerCount > corePoolSize返回true。
processWorkerExit方法流程图如下:
下面看看源码:
processWorkerExit只会在工作线程中被调用,主要用于清理和记录一个即将死亡的线程,该方法可能会终止线程池。这里不再详细tryTerminate和addWorker的实现,关于tryTerminate和addWorker的分析参见ThreadPoolExecutor核心实现原理和源码解析<一>
欢迎指出本文有误的地方,转载请注明原文出处。
如何防止被中断?
worker实现了一个简单的不可重入互斥锁,工作线程执行任务时,首先会进行加锁,如果主线程想要中断当前工作线程,需要先获取锁,否则无法中断。当工作线程执行完任务则会释放锁,并调用getTask从workQueue获取任务继续执行。由此可知,只有在等待从workQueue中获取任务(调用getTask期间)时才能中断。工作线程接收到中断信息,并不会立即就会停止,而是会检查workQueue是否为空,不为空则还是会继续获取任务执行,只有队列为空才会被停止。因此中断是为了停止空闲线程,也就是那些从任务队列获取任务被阻塞(任务队列为空)的线程。后续会详细分析整个过程。
为什么Worker被设计为不可重入?
这就需要知道那些操作可能会发生中断工作线程的操作。目前主要有以下几个:
setCorePoolSize();
setMaximumPoolSize();
setKeppAliveTime();
allowCoreThreadTimeOut();
shutdown();
tryTerminate();
如果锁可以重入,调用诸如setCorePoolSize等线程池控制方法时可以再次获取锁,那么可能会导致调用线程池控制方法期间中断正在运行的工作线程。jdk不希望在调用像setCorePoolSize这样的池控制方法时重新获取锁。
Worker源码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * 该类实际绝不会被序列化,提供serialVersionUID主要为了屏蔽javac warning */ private static final long serialVersionUID = 6138294804551838833L; /** 运行在Worker对象中的线程 */ final Thread thread; /** 要运行的初始任务,可能为null */ Runnable firstTask; /** 每个线程的任务计数器,使用volatile保证可见性 */ volatile long completedTasks; /** * 使用指定的初始任务和ThreadFactory中的线程对象创建一个Worker */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 将主运行循环委托给外部的runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() == 1; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }
核心函数 runWorker
runWorker流程图:runWorker会不断从工作队列表中取任务并执行;同时runWorker也会管理线程的中断状态,源码如下:
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true;//是否“突然完成”,非正常完成 try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
主要步骤:
1 从初始任务开始执行,如果firstTask 为null,只要线程池在运行,调用getTask从队列中取任务来执行。如果getTask返回null,则worker可能由于线程池状态调整或参数动态调整导致退出。若外部代码中抛出异常导致worker退出,completedAbruptly将为true,则在processWorkerExit将创建新的worker替代。
2 执行任务前,对worker加锁,已防止在任务运行时,线程池中其他操作中断当前worker。调用clearInterruptsForTaskRun管理线程中断状态,首先看看源码:
private void clearInterruptsForTaskRun() { if (runStateLessThan(ctl.get(), STOP) && Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) Thread.currentThread().interrupt(); }
这个方法调用非常重要,当线程池状态小于STOP,调用Thread.interrupted(),如果getTask期间设置了worker的中断状态,则返回true,同时Thread.interrupted()将清除中断状态,即再次调用将返回false;再次检查线程池状态,如果状态大于或等于STOP,则需要调用Thread.currentThread().interrupt()恢复线程的中断状态。因此,该方法有两个作用:
<一>:当线程池仍然在运行时,若其他操作中断了worker,则该操作将清除中断状态
<二>:清除中断状态后,再次检查线程池状态,如果状态大于或等于STOP,此时需要恢复线程的中断状态,这样在下次调用getTask将返回null,worker将正常退出。
3 每个任务执行前,调用beforeExecute,beforeExecute可能抛出异常,该情况下抛出的异常会导致任务未执行worker就死亡,没有使用catch处理,会向上抛跳出循环,且completedAbruptly==true。
4 beforeExecute正常完成则开始运行任务,并收集其抛出的任何异常以发送到afterExecute,这里将分别处理分别处理RuntimeException,Error和任意Throwables,由于不能在Runnable.run内重新抛出Throwables,因此将Throwable包装为Error(到线程的UncaughtExceptionHandler中处理)向上抛。任何向上抛的异常都将导致线程死亡,completedAbruptly仍然为true。
5 任务执行完成后,调用afterExecute,该方法同样可能抛出异常,并导致线程死亡。
获取任务
runWorker运行期间,将不断调用getTask()从任务队列中取任务来执行。getTask方法流程图如下:
源码如下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? /** * 外层循环 * 用于检查线程池状态和工作队列是否为空 */ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 调用了shutdownNow()或调用了shutdown()且workQueue为空,返回true if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? /** * 内层循环 * 用于检测工作线程数量和获取task的超时状态 */ for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
任务队列为空时,getTask()会根据当前线程池的配置执行阻塞或定时等待任务,当发生以下条件时,将返回null:
1 工作线程的数量超过maximumPoolSize
2 线程池已经停止
3 线程池调用了shutdown且任务队列为空
4 工作线程等待一个任务超时,且allowCoreThreadTimeOut || workerCount > corePoolSize返回true。
工作线程退出
runWorker中,当getTask返回null或抛出异常,将进入processWorkerExit处理工作线程的退出。processWorkerExit方法流程图如下:
下面看看源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 如果是突然终止,工作线程数减1 * 如果不是突然终止,在getTask()中已经减1 */ if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock();//锁定线程池 try { completedTaskCount += w.completedTasks;//汇总完成的任务数量 workers.remove(w);//移除工作线程 } finally { mainLock.unlock(); } tryTerminate();//尝试终止线程池 int c = ctl.get(); //状态是running、shutdown,即tryTerminate()没有成功终止线程池 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //任务队列中仍然有任务未执行,需至少保证有一个工作线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; /** * allowCoreThreadTimeOut为false则需要保证线程池中至少有corePoolSize数量的工作线程 */ if (workerCountOf(c) >= min) return; } //添加一个没有firstTask的工作线程 addWorker(null, false); } }
processWorkerExit只会在工作线程中被调用,主要用于清理和记录一个即将死亡的线程,该方法可能会终止线程池。这里不再详细tryTerminate和addWorker的实现,关于tryTerminate和addWorker的分析参见ThreadPoolExecutor核心实现原理和源码解析<一>
欢迎指出本文有误的地方,转载请注明原文出处。
相关文章推荐
- ThreadPoolExecutor核心实现原理和源码解析<二>
- ThreadPoolExecutor核心实现原理和源码解析<一>
- ThreadPoolExecutor核心实现原理和源码解析<一>
- Volley<二> 实现原理深度解析
- AFNetworking2.0源码解析<二>
- AFNetworking2.0源码解析<二>
- Box2d源码学习<二>内存管理之SOA的实现
- AFNetworking2.0源码解析<二>
- JSPatch实现原理详解<二>
- Spring AOP的实现原理之<aop:aspectj-autoproxy />的解析
- dex2jar源码解析----解析dex文件<二>
- 高效实现Map的原子更新操作(Hystrix的InternMap<K, V>源码解析)
- AFNetworking2.0源码解析<二>
- spring 实现IOC过程源码解析<一>
- Spring AOP的实现原理之<aop:aspectj-autoproxy />的解析
- AFNetworking (3.1.0) 源码解析 <二>
- SpringMVC 源码深度解析<context:component-scan>(扫描和注冊的注解Bean)
- 循环队列及C语言实现<二>
- Java HashMap 核心源码解读<摘录>