Java Executor并发框架(十)Executor框架线程池源码解析
2016-10-09 17:32
886 查看
一、介绍
Executors是 Executor、ExecutorService、ThreadFactory、Callable 类的工厂和工具方法。
二、源码解析
创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销。当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲。由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行。线程池中的线程会一直存在,除非明确地shutdown 掉。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
返回的是 ThreadPoolExecutor 类的对象,这个构造方法使用传入的参数和默认的线程工厂与拒绝执行的处理。
corePoolSize:线程池中的线程数量,除非设置了 allowCoreThreadTimeOut, 否则就算线程空闲还是在保存在线程池中
maximumPoolSize:线程池中允许存放最大的线程数量
keepAliveTime:当线程数大于 corePoolSize,如果 keepAliveTime 内空闲的线程未执行,线程将被终结
unit:keepAliveTime 的时间单位
workQueue:保存 execute() 提交的任务
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }可以看到固定大小线程池,corePoolSize 和 maximumPoolSize 传入的参数是一样的。
创建一个单个线程的线程池。任务会被保证顺序执行,因为只有一个工作线程。不像 newFixedThreadPool(1),这个不保证任务顺序执行。corePoolSize
和 maximumPoolSize 都是 1。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
创建一个可按需自动扩容的线程池,但是会优先重用线程池中空闲可用的线程。这个类型的线程池将会大大提升执行许多短暂的异步任务的程序。如果线程池中线程都在使用,又有新任务到来,则新增一个线程到线程池。如果线程 60 秒内空闲,则将被终止移除线程池。corePoolSize
为 0,可知一旦线程 60s 空闲就会被移出线程池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
创建完线程池,然后就该执行任务了。看下内部类 DelegatedExecutorService 里的 execute 方法:可以看到任务执行策略(单线程串行、多线程并行)和任务的具体执行分离,是一个典型的命令模式。
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
创建线程池的时候,已经实例化了 ThreadPoolExecutor,所以上面 execute() 方法实际是调用 ThreadPoolExecutor 的 execute:给定的任务可能在未来的某个时刻执行。可能是新建一个线程执行,也可能是线程中原有的线程执行。如果任务不能执行,可能是这个 executor 已经被 shutdown 了,也可能是到达了线程池的执行阈值,任务被拒绝执行处理器处理中。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
看到这,其实发现线程池的核心实现就是在 ThreadPoolExecutor 里面,所以先介绍下 ThreadPoolExecutor 类的作用:
上图可以看出 ThreadPoolExecutor 类的层次结构中的位置,是对抽象方法和接口的完整实现,即核心代码在这个类里。
一个 ExecutorService 执行每个任务可能用到线程池中的一个或多个线程,线程池由 Executors 工厂创建。
线程池解决了两个不同的问题:
执行大量异步的任务时,线程池减少线程的创建来减少开销,提升性能
提供了对资源的管理,包括当执行一系列任务时,线程的消耗。每个 ThreadPoolExecutor 也存储有些基本数据,诸如完成的任务数量
想要在更广阔的背景下使用的话,这个类提供了许多可调整的参数和扩展的 hook。不管怎么样,还是推荐使用 Executors 来创建线程池比较方便。
这个主要的线程池控制状态 ctl,使用 AtomicInteger 存储两个概念上的字段 workerCount( 线程池有效的线程数)
和 runState( 线程池是
running、shuting down 等等状态)
上述
execute() 方法有三个处理步骤:
如果正在 running 的线程 (即 worker 线程) 小于 corePoolSize ( workerCountOf(c) < corePoolSize ),尝试创建一个新线程并把传入的 command(任务) 作为它的第一个任务。调用 addWorker(command, true) 自动检查 runState 和 workCount,以便在不能增加线程的时候返回 false。添加成功直接 return,不能添加就要看下面的步骤
如果被成功放进队列(if (isRunning(c) && workQueue.offer(command)) workQueue 是一个任务排队的阻塞队列),然后还需要二次检查线程池是否 shut down(上次检查后到这次检查前死亡)。所以我们重新检查状态,如果线程池停止的话,回滚进队操作,或者如果没有工作的线程开启一个新线程(addWorker(null, false);)
如果不能让任务进入阻塞队列,然后尝试新增一个线程。如果新增线程失败,可能是线程池 shut down 或者线程池饱和(达到 maxPoolSize),所以接下来抛弃这个任务。
任务(Task)被包装在一个叫做 Worker 的内部类,Worker 继承 AQS 来实现任务增加/删除的同步控制,使用 HashSet 来保存 Worker 线程。如果 worker 线程大于 corePoolSize,则不创建 worker 线程,而是放入一个 BlockingQueue
排队。如果有界队列的 BlockingQueue 满了,则尝试增加线程到线程池,但是线程总数要小于 maxPollSize。
addWorker() 方法:检查线程池当前状态和线程数量的边界条件看是否可以增加 worker 线程。如果可以,工作线程计数响应调整,并且,如果可能的话,新的
worker 线程被创建,启动,跑它的第一个 task。如果线程池 stop 或者要被 shut down,此方法返回 false。如果线程工厂(thread factory)创建线程失败,此方法也会返回 false。如果线程创建失败,不管是 thread factory 返回 null,或者 exception(通常是 OOM),都会被回滚。
参数
firstTask: 新线程应该运行的第一个任务。如果 worker 线程小于 corePoolSize,worker 和初始化的第一次任务一起创建绕过排队这一过程,或者队列已满。初始化空闲线程通常是通过 prestartCoreThread 或者替换已经死亡的 worker 线程。
core:如果 true,则使用 corePoolSize 作为边界,否则使用 maximumPoolSize 作为边界。(这里使用 Boolean 而不是传入实际值,是因为传入值可能会在传入过程被改变,在方法中直接读取值更精确)。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
参考文献:
1.JDK
源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池
相关文章推荐
- Java Executor并发框架(六)Executor框架线程池任务执行全过程(上)
- 【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
- Java并发---- Executor并发框架--线程池,ThreadToolExecutor初步理解
- Java并发---- Executor并发框架--线程池,ThreadToolExecutor初步理解
- Java Executor并发框架(七)Executor框架线程池任务执行全过程(下)
- 转:【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
- Java并发—— Executor框架及线程池
- 【Java并发编程】之十九:并发新特性—Executor框架与线程池
- 【Java 并发】Executor框架机制与线程池配置使用
- java并发之线程池Executor知识要点 及 核心源码浅析
- 【Java并发编程】:并发新特性—Executor框架与线程池(含代码)
- Java Executor并发框架(十三)Executor框架线程池关于异常的处理
- Java 并发编程,线程池(ThreadPoolExecutor)源码解析
- 【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
- Java Executor并发框架(九)Executor框架线程池ExecutorService.shutdown什么时候执行
- 【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
- Java Executor并发框架(八)Executor框架线程池ThreadPoolExecutor、ScheduledThreadPoolExecutor
- 【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
- Java并发(基础知识)—— Executor框架及线程池
- Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,Completion