我之见--java 多线程 线程池ThreadPoolExecutor源码分析
2014-08-24 00:28
495 查看
前面我们对线程池使用已经有了一个很深的了解,什么是线程池?线程池有什么作用?线程池就是管理一组同构工作线程的资源池,线程池作用主要有两个方面,线程的管理,重复利用线程,避免重复创建线程。
我们先来看一下线程池相关的类图:
我们可以叫Executor做执行器,主要是execute(Runnable command)方法执行任务。
ExecutorService我们可以叫管理器,主要管理的生命周期,是不关闭,是否可以提交任务。
AbstractExecutorService 主要是对提交Callable进行封装成Runable。
ThreadPoolExecutor才是线程池的真正实现,下面我们来看一下源码的实现。
首先,我们看到内部类Worker的实现,Worker也就是我们经常说到的工作线程,每个Worker代表一个线程(相当于Thread);
Worker实现AQS我们几乎可以猜到它是线程安全的,而且会使用内部锁
Worker主要的成员变成 Runnable firstTask也是线程要执行的任务。
下面我们来看一下线程池的构造函数:
线程池使用
corePoolSize:工作线程数。
maximumPoolSize:支持最大线程数。
keepAliveTime和TimeUnit unit 线程空闲后设置多少时间开始回收线程。
workQueue:当达到工作线程数之后保存任务的队列。
threadFactory:产生线程的工厂。
handler:异常处理。
下面我们直接从执行方法execute (Runnable command) 开始分析:
ctl是当前工作线程数,如果当前线程数没有达到corePoolSize则添加工作线程;
我们接着看addWorker方法,
第二个for 里面core如果为真:表示工作线程没有达到最大运行线程corePoolSize 假则是与maximumPoolSize比较,如果比较它们大,也就是说达到最大线程数,直接返回添加失败。如果没有则通过 break retry跳出两个for。如果能过cas检测发现有别的线程改变,则继续检测。
下面继续看addWorker下面的代码
2. 获取锁,检测线程池是否已经关闭,如果没有则添加到工作队列中。
3. 工作线程开始执行t.start();
总结:到这里当线程没有达到工作线程数的时候,添加工作线程的过程我们分析完了。
我们接看下面的代码:
检测如果没有关闭,则添加到阻塞队列,然后再次检测如果没有最大线程数,则添加工作线程 (上面2),如果发现线程池此时进入关闭,则进入异常处理 (上面1) ,如果第一次检测没有关闭,同时添加到阻塞队列失败(原因可以是队列满了)。则尝试添加新的工作线程,注意这里的第一个参数是false也则是它比较的maximumPoolSize这个线程数,如果也没有办法添加则调用异常处理。
下面我们看一下异常处理:
handler.rejectedExecution(command, this);主要调用rejectedExecution这里有可能不会抛出异常,取决你的异常处理策略。有可能会尝试添加到队列 。
下面我们来看一下带返回值的添加过程会有一些不同。
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
RunnableFuture是实现Runable接口和Future接口的类,所以它也是一个Runable,这里添加的时候只是封装成RunnableFuture,然后execute和上面的流程一样了,同时返回Future,以便获取执行的返回值。
我们先来看一下线程池相关的类图:
我们可以叫Executor做执行器,主要是execute(Runnable command)方法执行任务。
ExecutorService我们可以叫管理器,主要管理的生命周期,是不关闭,是否可以提交任务。
AbstractExecutorService 主要是对提交Callable进行封装成Runable。
ThreadPoolExecutor才是线程池的真正实现,下面我们来看一下源码的实现。
首先,我们看到内部类Worker的实现,Worker也就是我们经常说到的工作线程,每个Worker代表一个线程(相当于Thread);
private final class Worker extends AbstractQueuedSynchronizer
Worker实现AQS我们几乎可以猜到它是线程安全的,而且会使用内部锁
public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }果然和我们猜的一样。
Worker主要的成员变成 Runnable firstTask也是线程要执行的任务。
下面我们来看一下线程池的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }在这里我只是大概说一下参数是什么意思!具体更详细可以参考
线程池使用
corePoolSize:工作线程数。
maximumPoolSize:支持最大线程数。
keepAliveTime和TimeUnit unit 线程空闲后设置多少时间开始回收线程。
workQueue:当达到工作线程数之后保存任务的队列。
threadFactory:产生线程的工厂。
handler:异常处理。
下面我们直接从执行方法execute (Runnable command) 开始分析:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
ctl是当前工作线程数,如果当前线程数没有达到corePoolSize则添加工作线程;
我们接着看addWorker方法,
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }第一个for主要是检测是已经处于关闭状态。
第二个for 里面core如果为真:表示工作线程没有达到最大运行线程corePoolSize 假则是与maximumPoolSize比较,如果比较它们大,也就是说达到最大线程数,直接返回添加失败。如果没有则通过 break retry跳出两个for。如果能过cas检测发现有别的线程改变,则继续检测。
下面继续看addWorker下面的代码
Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); // It is possible (but unlikely) for a thread to have been // added to workers, but not yet started, during transition to // STOP, which could result in a rare missed interrupt, // because Thread.interrupt is not guaranteed to have any effect // on a non-yet-started Thread (see Thread#interrupt). if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true;1. Worker w = new Worker(firstTask);创建工作线程。
2. 获取锁,检测线程池是否已经关闭,如果没有则添加到工作队列中。
3. 工作线程开始执行t.start();
总结:到这里当线程没有达到工作线程数的时候,添加工作线程的过程我们分析完了。
我们接看下面的代码:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);//<span style="background-color: rgb(255, 255, 255);"><span style="color:#ff0000;"> 1</span></span> else if (workerCountOf(recheck) == 0) addWorker(null, false);// <span style="color:#ff0000;">2</span> } else if (!addWorker(command, false)) //<span style="color:#ff0000;">4</span> reject(command);// <span style="color:#ff0000;">3</span>isRunning(c)方法表示线程池是否处于关闭状态。
检测如果没有关闭,则添加到阻塞队列,然后再次检测如果没有最大线程数,则添加工作线程 (上面2),如果发现线程池此时进入关闭,则进入异常处理 (上面1) ,如果第一次检测没有关闭,同时添加到阻塞队列失败(原因可以是队列满了)。则尝试添加新的工作线程,注意这里的第一个参数是false也则是它比较的maximumPoolSize这个线程数,如果也没有办法添加则调用异常处理。
下面我们看一下异常处理:
handler.rejectedExecution(command, this);主要调用rejectedExecution这里有可能不会抛出异常,取决你的异常处理策略。有可能会尝试添加到队列 。
下面我们来看一下带返回值的添加过程会有一些不同。
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
RunnableFuture是实现Runable接口和Future接口的类,所以它也是一个Runable,这里添加的时候只是封装成RunnableFuture,然后execute和上面的流程一样了,同时返回Future,以便获取执行的返回值。
相关文章推荐
- Java多线程-五中线程池分析以及AnsyncTask源码分析
- Java多线程之线程池深入分析
- 我之见--java多线程 ConcurrentHashMap 源码分析
- Java ThreadPoolExecutor线程池原理及源码分析
- Java多线程 -- JUC包源码分析1 -- CAS/乐观锁
- java学习-【转】JAVA 线程池源码分析
- Java多线程之线程池深入分析 (下)
- java线程池学习(五) —— ThreadPoolExecutor源码分析
- Java线程池源码分析(一)
- Java多线程总结(6)— 线程池的基本使用和执行流程分析
- Java 多线程:分析线程池的实现原理
- Java多线程之线程池深入分析(上)
- java线程池框架源码分析
- Java 多线程:分析线程池的实现原理
- Java多线程(十一)之线程池深入分析(上)
- Java多线程实现源码分析
- Java 多线程及线程池理论分析
- Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
- Java 容器源码分析之HashMap多线程并发问题分析
- JAVA线程池(ThreadPoolExecutor)源码分析