[Java]ThreadPoolExecutor源码分析
2016-05-24 15:41
423 查看
ThreadPoolExecutor 使用线程池来处理提交的线程任务,该类主要用来处理两大问题:1.提升大量异步任务的执行性能。2.同时也提供任务完成情况信息。
通常我们要使用ThreadPoolExecutor 可以通过其构造类方法,或者使用Executors工厂类的到ThreadPoolExecutor。
如果使用构造类方法,我们需要懂其中几个必要的构造参数:
corePoolSize:最小存活Worker数量,这里得说一个time out机制,每一个Worker都会有生存时间,过了生存时间就会被回收掉。一般情况下corePoolSize个Worker不会被回收,如果允许回收,就要设置allowCoreThreadTimeOut。
maximumPoolSize:线程池的极限大小
keepAliveTime:以nanoseconds表示的空闲Worker回收时间,allowCoreThreadTimeOut设置为true的情况下,包括空闲 core Thread也是被回收掉的
workQueue:这是用来保存待执行任务队列,并且会把任务给予Worker.
threadFactory:顾名思义,这是用来产生包裹Runnable对象的线程工厂。
handler:当提交任务数超过maximumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理。
一般情况下我们还是使用Executors来产生ThreadPoolExecutor
一般我们添加任务的时候都是使用 public void execute(Runnable command)
其中注释也很明确的分为3步:
1.如果少于corePoolSize的Worker在执行任务,那么就创建任务
2.否则检查Executor是否在执行然后将任务加入workQueue,其中进行了双重检查,如果Executor此时不在Running状态,那么进行回滚。
3.如果不能加进workQueue那么新建线程,不能的话就拒绝任务。
可以看出addWorker是关键操作,那么addWorker到底做了些什么呢?
COUNT_BITS表示位移位数 29
runStateOf(int c) 返回前3位数的值,运行状态总共有5种:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED.
RUNNING:(0b111.....):接受任务并且处理workQueue中的任务
SHUTDOWN(0b000.....):不接受任务但是处理workQueue中的任务
STOP(0b001......):不接受任务不处理workQueue中的任务,同时中断执行中的任务
TIDYING(0b010.....):所有任务终止,workCount=0,此时会调用terminated() hook
TERMINATED(0b011.....):terminated()结束。
addWorker中这一部分
并且rs状态不为SHUTDOWN或者firstTask不为null或者workQueue为空的情况下失败。
如果Worker数已达到上限则失败
否则增加Worker计数,也就是允许创建新的Worker来执行任务。
这里不得不提到一个关键的内部类Worker,Worker类如同名字一样是对执行线程任务的封装。
Worker实现了Runnable接口,也就是是实现了run()方法。
值得一提的是,通过BlockingQueue来控制addWorker().
通常我们要使用ThreadPoolExecutor 可以通过其构造类方法,或者使用Executors工厂类的到ThreadPoolExecutor。
如果使用构造类方法,我们需要懂其中几个必要的构造参数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:最小存活Worker数量,这里得说一个time out机制,每一个Worker都会有生存时间,过了生存时间就会被回收掉。一般情况下corePoolSize个Worker不会被回收,如果允许回收,就要设置allowCoreThreadTimeOut。
maximumPoolSize:线程池的极限大小
keepAliveTime:以nanoseconds表示的空闲Worker回收时间,allowCoreThreadTimeOut设置为true的情况下,包括空闲 core Thread也是被回收掉的
workQueue:这是用来保存待执行任务队列,并且会把任务给予Worker.
threadFactory:顾名思义,这是用来产生包裹Runnable对象的线程工厂。
handler:当提交任务数超过maximumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理。
一般情况下我们还是使用Executors来产生ThreadPoolExecutor
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
一般我们添加任务的时候都是使用 public void execute(Runnable command)
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
其中注释也很明确的分为3步:
1.如果少于corePoolSize的Worker在执行任务,那么就创建任务
2.否则检查Executor是否在执行然后将任务加入workQueue,其中进行了双重检查,如果Executor此时不在Running状态,那么进行回滚。
3.如果不能加进workQueue那么新建线程,不能的话就拒绝任务。
可以看出addWorker是关键操作,那么addWorker到底做了些什么呢?
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }这里需要明白一点即Executor的运行状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));这里用AtomicInteger来表征运行状态信息,Interger为32位数,其中前3位表示运行状态,后29表示所有的Worker数目
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
COUNT_BITS表示位移位数 29
runStateOf(int c) 返回前3位数的值,运行状态总共有5种:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED.
RUNNING:(0b111.....):接受任务并且处理workQueue中的任务
SHUTDOWN(0b000.....):不接受任务但是处理workQueue中的任务
STOP(0b001......):不接受任务不处理workQueue中的任务,同时中断执行中的任务
TIDYING(0b010.....):所有任务终止,workCount=0,此时会调用terminated() hook
TERMINATED(0b011.....):terminated()结束。
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
addWorker中这一部分
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty())) return false;rs>=SHUTDOWN即非RUNNING状态,之前也看到了各种状态的二进制形式只有RUNNING<0;
并且rs状态不为SHUTDOWN或者firstTask不为null或者workQueue为空的情况下失败。
如果Worker数已达到上限则失败
否则增加Worker计数,也就是允许创建新的Worker来执行任务。
这里不得不提到一个关键的内部类Worker,Worker类如同名字一样是对执行线程任务的封装。
Worker实现了Runnable接口,也就是是实现了run()方法。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }这里将任务传递给Worker,同时产生包裹线程。
值得一提的是,通过BlockingQueue来控制addWorker().
相关文章推荐
- Java基础第十一天_集合
- 【转】增加eclipse的运行内存 -- 不错!!
- [转]服务端工程师入门与进阶 Java 版
- java命令行编译和运行引用jar包的文件
- 在eclipse导入Java 的jar包的方法 JDBC
- 使用Java实现简单串口通信
- Spring属性编辑器
- JavaSE技术概述
- Struts2与easyui分页查询
- java对数据库中数据表的增删改查(基于MySQL数据库)
- 在Eclipse中导入jar文件
- Java 模板模式
- 海谈湖扯Java适配器模式
- java 接口练习题3
- java 之异步套接字编程实例(AIO)
- Spring 源码解析之HandlerAdapter源码解析(三)
- java接口练习题2
- Eclipse下导入外部jar包的3种方式
- 用java实现新浪爬虫,代码完整剖析(仅针对当前SinaSignOn有效)
- Spark Streaming源码解读之Executor容错安全性