Java 线程池源码
2016-05-13 11:58
423 查看
ThreadPoolExecutor参数
corePoolSize核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。
核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
maxPoolSize
当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
keepAliveTime
当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
allowCoreThreadTimeout
是否允许核心线程空闲退出,默认值为false。
queueCapacity
任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorDemo { public static void main(String[] args) { ExecutorService executors = Executors.newCachedThreadPool(); executors.execute(new Runnable() { @Override public void run() { System.out.println("hello"); } }); } }
线程池添加线程:
当线程数小于核心线程数时,创建线程。当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务
Executor
public interface Executor { void execute(Runnable command); }
ExecutorService
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Executors
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
SynchronousQueue
/** * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. * 不公平锁 */ public SynchronousQueue() { this(false); }
ThreadPoolExecutor
ctl:是一个原子整数包装两个概念字段workerCount、指示的有效数量的线程runState,指示是否运行,关闭等。public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //能否创建核心线程 if (workerCountOf(c) < corePoolSize) { //创建核心线程并且开启线程任务 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); //任务队列还未满,进入任务等待队列 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //任务队列已满,创建普通用户线程 else if (!addWorker(command, false)) reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //是否使用核心线程,还是普通的用户线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas失败重试:将ctl的标志重置 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //线程池的主锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //加入hashset数组中 workers.add(w); int s = workers.size(); //重新计算大小 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { //添加失败 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); //修改ctl decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
execute执行时: 1.调用workCountOf()方法获取当前线程池中线程数量,判断当前线程数量是否超过了核心线程数,若未超过,则直接添加一个核心线程,并用此线程完成当前提交的任务。 2.若当前线程数已经超过核心线程数且ctl状态等于RUNNING且工作成功进入工作等待队列,则我们进一步复查ctl,若ctl状态依旧为RUNNING,且调用workCountOf()方法检查发现此时的工作线程数为0时,将添加一个工作线程;若此时已经不为RUNNING,则尝试移除任务,并调用拒绝任务方法:reject(command)。(这里之所以需要复查ctl状态是由于在执行workQueue.offer(command)方法时,ctl状态随时可能由于调用shutDown方法或者shutDownNow方法而发生变化)。 3.如果上述两种情况都不吻合,即此时已经有超过核心线程数的的线程在工作,且任务队列也已堆满,则尝试增加一个工作线程(如果此时线程数达到限定最大线程数,则会失败),若失败则调用拒绝任务方法reject(command). 在execute的方法中添加工作线程的所调用的法为addWorker(Runnable runnable,Boolean core).该方法接受两个参数,runnable对象为这个新建线程的第一个工作任务,可以为空。core指代新建的任务是否是核心线程。
参考
1 当前所说的计算机 “有几个CPU”准确的说法是“CPU有几个核心”,任务管理器里面看到的是CPU的线程。即CPU物理上的个数为一个,但是有多个核心。 2 双核处理器即是基于单个半导体的一个处理器上拥有两个一样功能的处理器核心。换句话说,将两个物理处理器核心整合入一个核中。 3 核心与线程的关系: 核相当于公路 双核是2条3米的路 4核是4条3米的路 双核四线程是两条路中间又划了2根线分成4条1.5的路 当然 对于1.5米的小车来说 一次并排可以跑4辆(原来不划分3米的2条路也只能开2个小车) 但是3米的大车也只是跑2辆 但是4核就可以跑4辆大车 就是说4线程在处理小的并行数据时候多线程起作用 大的并行数据它的处理能力比不上真的4核
相关文章推荐
- Java 内存分配全面浅析
- struts2动态方法调用
- 【JAVA】Java语言入门
- Java类集框架
- Struts2核心工作原理解析
- eclipse中安装maven
- C3P0配置实战
- Java程序优化细节
- 【第九章】 Spring的事务 之 9.3 编程式事务 ——跟我学spring3
- JAVA 设置环境变量批处理
- java复合语句与条件语句
- java笔记(2)
- java中的批处理
- eclipse插件安装
- java读取文件方法总结
- JAVA WEB进阶一
- SpringMVC 使用poi导出excel简单小例子
- JAVA开发基本设置
- SpringMVC通过注解@Value获取properties配置
- 验证码的生成与验证,控制层工具