java.util.concurrent.ThreadPoolExecutor实现机制简介
2016-06-29 12:49
369 查看
java.util.concurrent.ThreadPoolExecutor实现机制简介
前言
最近面试的时候被问到『如何实现一个线程池』的问题。当时没回答上来。今天看了一下JDK源码,大概了解了一下java.util.concurrent.ThreadPoolExecutor的实现方式,整理如下。
思路
根据javadoc中关于ThreadPoolExecutor类的描述可知。ThreadPoolExecutor的实现主要依靠两个数据结构:线程池
任务队列
任务队列使用的数据结构比较容易想到,可以采用实现了java.util.concurrent.BlockingQueue接口的类。
线程池该怎么实现才能让线程池里的线程持续执行一个接一个的任务呢?
我们来看一下JDK里是怎么实现的吧。
线程池
public class ThreadPoolExecutor extends AbstractExecutorService { ... /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); ... /** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue; ... }
如代码中的注释所说,workers就是存放工作线程的线程池,就是一个简单的HashSet。那么,关键信息一定是藏在这个Worker类里了。
Worker类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; ... /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ... }
Worker是ThreadPoolExecutor的内部类,成员变量thread就是实际执行任务的线程。这个thread不直接执行用户提交的任务,它执行的任务就是它所在的Worker对象。
Worker对象的run()方法调用了ThreadPoolExecutor.runWorker(Worker w)方法。
runWorker(Worker w)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
程序的大致逻辑就是在firstTask或getTask()返回方法不为空的情况下执行task.run()。这里的getTask()方法就是从用户任务队列workQueue获取任务的那个方法。
Worker类的执行逻辑大致就是这样了。那么ThreadPoolExecutor是如何新建和启动这些Worker类的呢?
来看一下我们提交任务时使用的ThreadPoolExecutor.execute(Runnable command)方法。
execute(Runnable command)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
除去处理ThreadPoolExecutor对象状态的代码,最关键的两段代码就是workQueue.offer(command)和addWorker(command, true)。
workQueue.offer(command)是将任务加入队列。
新建和启动Worker对象的代码就是在addWorker(command, true)里了。
addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这个方法里执行了三个我们关注的操作:
新建Worker对象——w = new Worker(firstTask);
将Worker对象加入workers集合——workers.add(w);
启动Worker对象里的thread——t.start();
总结
简单概括一下ThreadPoolExecutor的运行过程(不包括线程池大小控制、线程池关闭等逻辑):ThreadPoolExecutor.execute(Runnable command)提交任务
如果Worker数量未达到上限,新建一个Worker并将command作为Worker的firstTask
如果Worker数量已达到上限,则将command放入workQueue
每个启动了的worker先执行firstTask,然后继续从workQueue获取task来执行
相关文章推荐
- JDK动态代理VS CgLib
- Ubuntu 安装 JDK 问题
- C#多线程学习之(四)使用线程池进行多线程的自动管理
- c++线程池实现方法
- C语言实现支持动态拓展和销毁的线程池
- c++实现简单的线程池
- Java线程池的几种实现方法和区别介绍
- 深入java线程池的使用详解
- jdk与jre的区别 很形象,很清晰,通俗易懂
- jdk中String类设计成final的原由
- win7下安装 JDK 基本流程
- java中通用的线程池实例代码
- jdk环境变量配置
- win2003 jsp运行环境架设心得(jdk+tomcat)
- windows linux jdk安装配置方法
- Java编程之jdk1.4,jdk1.5和jdk1.6的区别分析(经典)
- Java编程中线程池的基本概念和使用
- 详解JDK 5 Annotation 注解之@Target的用法介绍
- C#线程处理系列之线程池中的I/O线程
- C#线程池操作方法