Java线程池
2017-06-07 00:00
190 查看
摘要: 本文主要从源码的方式介绍一下线程池ThreadPoolExecutor,梳理了核心实现和流程,并且介绍了工厂类Executors构造几个ThreadPoolExecutor的不同和注意事项。
先来看一下线程池的5中状态:
RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,注意这是线程池的状态不要和线程的状态弄混淆了。
RUNNING状态就是能够接受新的任务和处理队列中的任务
RUNNING状态可以通过显示调用shutdownNow()或者在finalize中隐式调用shutdownNow()方法到SHUTDOWN
SHUTDOWN
SHUTDOWN状态是不接受新的任务,但是会继续处理队列中的任务。RUNNING或者SHUTDOWN状态的时候可以通过调用shutdownNow()直接到STOP状态。当任务队列和线程池都为空的时候就到TIDYING状态。
STOP
STOP状态是及不接受新的任务,也不处理队列中的任务,并且会中断正在处理的任务。当线程池为空的时候就转换到TIDYING状态。
TIDYING
TIDYING状态是所有的任务都被终止了,工作线程为0,并且正在执行terminated()钩子方法的时候是TIDYING状态。
TERMINATED
TERMINATED状态是terminated()方法已经完成的时候。
知道了上面部分的知识,就可以来看一下部分的ThreadPoolExecutor代码了:
最开始我看上面的代码是被弄得一愣一愣的哦,什么鬼,和一般的位移操作怎么不一样。后来仔细的阅读了一下文档才发现问题。ctl这个变量是packing了2个东西,一个是线程池状态(runState),一个是工作线程的数量(workCount)。大神的思维跳跃太快,不看一下文档完全跟不上啊。下面来简单分析一下啊。
首先上面已经总结了线程池的状态有5种,4<5<8至少需要3位才能表示,所以就用3位表示吧。下面的代码就好理解了:
就是线程池的状态用高3位表示。在看一下:
CAPACITY的二进制表示为00011111 11111111 11111111 11111111
现在下面的代码也容易理解了:
runStateOf和workerCountOf的参数都是ctl就是状态和数量组合的整形量。先看runStateOf方法。
CAPACITY是:
00011111 11111111 11111111 11111111
~CAPACITY是:
11100000 00000000 00000000 00000000
和~CAPACITY &就是把高3位取出来嘛。
同理和CAPACITY &就是把第29为取出来。
ctlOf方法就是使用|把runState和workCount组合起来。
workQueue是用于持有任务的队列
HashSet<Worker> workers
workers是包含线程池中所有工作线程worker的集合
volatile int corePoolSize
核心线程池大小,保持存活的工作线程的最小数目,当小于corePoolSize时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程
volatile int maximumPoolSize
线程池最大大小,也就是线程池中线程的最大数量
volatile long keepAliveTime
空闲线程等待工作的超时时间,即空闲线程存活时间
ThreadFactory threadFactory
线程工厂,通过Runnable创建Thread的工厂类
RejectedExecutionHandler handler
拒绝策略,加入线程池失败的时候的拒绝策略
在执行一个任务的时候主要分3步:
如果工作线程数量(workCount)还没有达到corePoolSize,那么就启动一个先的线程来执行新提交的任务。
否则,检查线程池的状态和把任务加入到任务队列中,如果线程池还在运行并且成功把任务加入到了任务队列中,再一次检查线程池是否还在运行。
如果不能把任务加入到队列中,那么有2中情况,一是:线程池关闭了,二是:任务队列满了。直接执行拒绝策略方法reject(command)
addWorker方法比较复杂,其实就做了一件事情,就是检查线程池是否关闭,线程数量有没有超过最大的容量,有没有超过corePoolSize,maximumPoolSize等相关条件,如果所用条件检查通过就创建一个Worker,并且启动这个Worker。
我们再一次通过ThreadPoolExecutor的execute简化流程来过一下各个参数的含义。在线程池(ThreadPoolExecutor)执行一个任务(Runnable)的时候,会先检查线程池的状态是否关闭,运行的线程的个数(workCount)是否小于corePoolSize,如果小于使用threadFactory来新创建一个线程来执行任务。如果大于corePoolSize则将任务(Runnable)加入到任务队列中,如果加入阻塞队列失败,则重新尝试把任务使用addWork来执行,主要是因为如果运行线程数小于maximumPoolSize,任务还是有机会被执行。
newFixedThreadPool是创建固定的线程来执行任务,从创建的的参数可以看到corePoolSize和maximumPoolSize一样大,并且使用的是一个无界阻塞队列LinkedBlockingQueue。新的任务如果超过了nThreads,那么就会被加入到无界阻塞队列中,因为是无界的,所以一定会被存在队列中,如果出现异常情况,因为corePoolSize和maximumPoolSize是一样大的,所以也没有机会开先的线程执行。
这个工厂方法和上一个一样,只是指定了创建新线程的工厂类。
newSingleThreadExecutor是newFixedThreadPool一个特例,只有一个线程来执行任务。
这个方法和上一个方法一样,不过指定了创建线程时候的工厂类。
再看newCachedThreadPool方法之前,我们先看一下SynchronousQueue。SynchronousQueue也是阻塞队列,不过队列没有任何内部容量,所以SynchronousQueue队列是获取数据和添加数据必须是配对的,前面的源码我们看到ThreadPoolExecutor使用的是offer的非阻塞方法,所以每一个任务都会创建一个新的线程来执行。
现在来看newCachedThreadPool方法,corePoolSize是0,maximumPoolSize是最大整数,线程的存活时间是60秒。也就是说使用newCachedThreadPool构造的线程池是这样一个线程池:每一个任务来要么加入SynchronousQueue队列中,要么创建一个线程来执行这个任务。没有任务的线程会存活60秒。
这个方法和上一个方法一样,只是指定了创建线程的工厂类。
Worker是ThreadPoolExecutor的成员内部类,Worker继承了AbstractQueuedSynchronizer自带锁特效,实现了Runnable接口。我们看到在Worker关联了一个Thread,这个就是执行Worker的Thread,在Worker的构造函数中我们看到创建一个Worker的时候就会创建一个Thread来执行Worker自身。Worker的run方法是调用的外部类ThreadPoolExecutor的runWorker方法。下面就来看一下ThreadPoolExecutor的runWorker方法。
runWorker就是线程复用的关键,runWorker是只在Worker的run方法中调用的,前面已经说了Worker关联了一个Thread来执行Worker自身,runWorker就是先执行Worker本身的任务firstTask,然后就不断的从任务队列中获取任务来执行,看while (task != null || (task = getTask()) != null)这条语句,getTask()是从任务队列中获取任务。
注意:把线程(Thread)和任务(Runnable)分开
线程池执行任务的流程是抽象到了ThreadPoolExecutor的execute方法中,主要可以抽象为3步:
(1) 如果运行线程(workCount)小于corePoolSize,则创建新Worker来执行任务(一个Worker一个线程)
(2) 如果运行线程(workCount)大于corePoolSize,则加入任务队列
(3) 如果添加到任务队列失败,则还是尝试添加一个Worker,如果运行线程(workCount)小于maximumPoolSize则有机会创建一个新的Worker来执行任务,否则失败
(4) 如果第(3)步失败,则执行拒绝任务策略
下面是一个简化版的流程图
简介
说Java的线程池,我们先不上Executors这个工厂类啊,ThreadPoolExecutor这个才是主食,Executors只是饭后甜点。先来看一下线程池的5中状态:
RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,注意这是线程池的状态不要和线程的状态弄混淆了。
线程池状态
RUNNINGRUNNING状态就是能够接受新的任务和处理队列中的任务
RUNNING状态可以通过显示调用shutdownNow()或者在finalize中隐式调用shutdownNow()方法到SHUTDOWN
SHUTDOWN
SHUTDOWN状态是不接受新的任务,但是会继续处理队列中的任务。RUNNING或者SHUTDOWN状态的时候可以通过调用shutdownNow()直接到STOP状态。当任务队列和线程池都为空的时候就到TIDYING状态。
STOP
STOP状态是及不接受新的任务,也不处理队列中的任务,并且会中断正在处理的任务。当线程池为空的时候就转换到TIDYING状态。
TIDYING
TIDYING状态是所有的任务都被终止了,工作线程为0,并且正在执行terminated()钩子方法的时候是TIDYING状态。
TERMINATED
TERMINATED状态是terminated()方法已经完成的时候。
知道了上面部分的知识,就可以来看一下部分的ThreadPoolExecutor代码了:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
最开始我看上面的代码是被弄得一愣一愣的哦,什么鬼,和一般的位移操作怎么不一样。后来仔细的阅读了一下文档才发现问题。ctl这个变量是packing了2个东西,一个是线程池状态(runState),一个是工作线程的数量(workCount)。大神的思维跳跃太快,不看一下文档完全跟不上啊。下面来简单分析一下啊。
首先上面已经总结了线程池的状态有5种,4<5<8至少需要3位才能表示,所以就用3位表示吧。下面的代码就好理解了:
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
就是线程池的状态用高3位表示。在看一下:
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
CAPACITY的二进制表示为00011111 11111111 11111111 11111111
现在下面的代码也容易理解了:
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
runStateOf和workerCountOf的参数都是ctl就是状态和数量组合的整形量。先看runStateOf方法。
CAPACITY是:
00011111 11111111 11111111 11111111
~CAPACITY是:
11100000 00000000 00000000 00000000
和~CAPACITY &就是把高3位取出来嘛。
同理和CAPACITY &就是把第29为取出来。
ctlOf方法就是使用|把runState和workCount组合起来。
线程池中几个重要的成员
BlockingQueue<Runnable> workQueueworkQueue是用于持有任务的队列
HashSet<Worker> workers
workers是包含线程池中所有工作线程worker的集合
volatile int corePoolSize
核心线程池大小,保持存活的工作线程的最小数目,当小于corePoolSize时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程
volatile int maximumPoolSize
线程池最大大小,也就是线程池中线程的最大数量
volatile long keepAliveTime
空闲线程等待工作的超时时间,即空闲线程存活时间
ThreadFactory threadFactory
线程工厂,通过Runnable创建Thread的工厂类
RejectedExecutionHandler handler
拒绝策略,加入线程池失败的时候的拒绝策略
execute方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查,线程池状态可能被改变 if (! isRunning(recheck) && remove(command)) reject(command); //保证至少有一个Worker可以执行任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
在执行一个任务的时候主要分3步:
如果工作线程数量(workCount)还没有达到corePoolSize,那么就启动一个先的线程来执行新提交的任务。
否则,检查线程池的状态和把任务加入到任务队列中,如果线程池还在运行并且成功把任务加入到了任务队列中,再一次检查线程池是否还在运行。
如果不能把任务加入到队列中,那么有2中情况,一是:线程池关闭了,二是:任务队列满了。直接执行拒绝策略方法reject(command)
addWorker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker方法比较复杂,其实就做了一件事情,就是检查线程池是否关闭,线程数量有没有超过最大的容量,有没有超过corePoolSize,maximumPoolSize等相关条件,如果所用条件检查通过就创建一个Worker,并且启动这个Worker。
Executors
JUC的Executors是一个工厂类,这里主要看一下构造ThreadPoolExecutor的几个工厂方法,先看一下完整的ThreadPoolExecutor的构造方法:public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
我们再一次通过ThreadPoolExecutor的execute简化流程来过一下各个参数的含义。在线程池(ThreadPoolExecutor)执行一个任务(Runnable)的时候,会先检查线程池的状态是否关闭,运行的线程的个数(workCount)是否小于corePoolSize,如果小于使用threadFactory来新创建一个线程来执行任务。如果大于corePoolSize则将任务(Runnable)加入到任务队列中,如果加入阻塞队列失败,则重新尝试把任务使用addWork来执行,主要是因为如果运行线程数小于maximumPoolSize,任务还是有机会被执行。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool是创建固定的线程来执行任务,从创建的的参数可以看到corePoolSize和maximumPoolSize一样大,并且使用的是一个无界阻塞队列LinkedBlockingQueue。新的任务如果超过了nThreads,那么就会被加入到无界阻塞队列中,因为是无界的,所以一定会被存在队列中,如果出现异常情况,因为corePoolSize和maximumPoolSize是一样大的,所以也没有机会开先的线程执行。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
这个工厂方法和上一个一样,只是指定了创建新线程的工厂类。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor是newFixedThreadPool一个特例,只有一个线程来执行任务。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
这个方法和上一个方法一样,不过指定了创建线程时候的工厂类。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
再看newCachedThreadPool方法之前,我们先看一下SynchronousQueue。SynchronousQueue也是阻塞队列,不过队列没有任何内部容量,所以SynchronousQueue队列是获取数据和添加数据必须是配对的,前面的源码我们看到ThreadPoolExecutor使用的是offer的非阻塞方法,所以每一个任务都会创建一个新的线程来执行。
现在来看newCachedThreadPool方法,corePoolSize是0,maximumPoolSize是最大整数,线程的存活时间是60秒。也就是说使用newCachedThreadPool构造的线程池是这样一个线程池:每一个任务来要么加入SynchronousQueue队列中,要么创建一个线程来执行这个任务。没有任务的线程会存活60秒。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
这个方法和上一个方法一样,只是指定了创建线程的工厂类。
线程池的池体现在什么地方
我们先看Worker类:private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker是ThreadPoolExecutor的成员内部类,Worker继承了AbstractQueuedSynchronizer自带锁特效,实现了Runnable接口。我们看到在Worker关联了一个Thread,这个就是执行Worker的Thread,在Worker的构造函数中我们看到创建一个Worker的时候就会创建一个Thread来执行Worker自身。Worker的run方法是调用的外部类ThreadPoolExecutor的runWorker方法。下面就来看一下ThreadPoolExecutor的runWorker方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); //如果线程池停止了,或者当前线程被中断并且线程停止了并且Worker的线程没有被中断,就中断Worker的线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null;//执行完将任务置空,下一次从任务队列中获取任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker就是线程复用的关键,runWorker是只在Worker的run方法中调用的,前面已经说了Worker关联了一个Thread来执行Worker自身,runWorker就是先执行Worker本身的任务firstTask,然后就不断的从任务队列中获取任务来执行,看while (task != null || (task = getTask()) != null)这条语句,getTask()是从任务队列中获取任务。
注意:把线程(Thread)和任务(Runnable)分开
总结
Worker实现了Runnable接口,并且组合了一个Thread,Thread是在Worker创建的时候使用ThreadPoolExecutor的线程工厂创建的,它就是为了执行Worker,Worker的run方法调用的是ThreadPoolExecutor的runWorker方法,这个方法就是先执行完Worker任务,然后不断从任务队列中获取任务执行,实现线程复用。线程池执行任务的流程是抽象到了ThreadPoolExecutor的execute方法中,主要可以抽象为3步:
(1) 如果运行线程(workCount)小于corePoolSize,则创建新Worker来执行任务(一个Worker一个线程)
(2) 如果运行线程(workCount)大于corePoolSize,则加入任务队列
(3) 如果添加到任务队列失败,则还是尝试添加一个Worker,如果运行线程(workCount)小于maximumPoolSize则有机会创建一个新的Worker来执行任务,否则失败
(4) 如果第(3)步失败,则执行拒绝任务策略
下面是一个简化版的流程图
相关文章推荐
- JAVA线程池的简单实现及优先级设置
- java线程池的例子
- JAVA线程池原理以及几种线程池类型介绍
- java线程池
- 自己动手实现JAVA线程池
- Java线程池的原理
- java线程池中的shutdown()与shutdownNow()
- Java线程池
- JAVA线程池例子
- Java线程池的原理及几类线程池的介绍
- Java线程池使用说明
- Java线程池使用说明 (转)
- java线程池和队列分析
- Java线程池使用说明
- java线程池的使用
- java线程池
- java线程池的原理学习
- java线程池学习(四) —— Executors类
- Java线程池使用说明
- java线程池(获取亚马逊商品页面数据)