您的位置:首页 > 编程语言 > Java开发

Java线程池

2017-06-07 00:00 190 查看
摘要: 本文主要从源码的方式介绍一下线程池ThreadPoolExecutor,梳理了核心实现和流程,并且介绍了工厂类Executors构造几个ThreadPoolExecutor的不同和注意事项。

简介

说Java的线程池,我们先不上Executors这个工厂类啊,ThreadPoolExecutor这个才是主食,Executors只是饭后甜点。

先来看一下线程池的5中状态:
RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,注意这是线程池的状态不要和线程的状态弄混淆了。

线程池状态

RUNNING
RUNNING状态就是能够接受新的任务和处理队列中的任务
RUNNING状态可以通过显示调用shutdownNow()或者在finalize中隐式调用shutdownNow()方法到SHUTDOWN

SHUTDOWN
SHUTDOWN状态是不接受新的任务,但是会继续处理队列中的任务。RUNNING或者SHUTDOWN状态的时候可以通过调用shutdownNow()直接到STOP状态。当任务队列和线程池都为空的时候就到TIDYING状态。

STOP
STOP状态是及不接受新的任务,也不处理队列中的任务,并且会中断正在处理的任务。当线程池为空的时候就转换到TIDYING状态。

TIDYING
TIDYING状态是所有的任务都被终止了,工作线程为0,并且正在执行terminated()钩子方法的时候是TIDYING状态。

TERMINATED
TERMINATED状态是terminated()方法已经完成的时候。

知道了上面部分的知识,就可以来看一下部分的ThreadPoolExecutor代码了:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

最开始我看上面的代码是被弄得一愣一愣的哦,什么鬼,和一般的位移操作怎么不一样。后来仔细的阅读了一下文档才发现问题。ctl这个变量是packing了2个东西,一个是线程池状态(runState),一个是工作线程的数量(workCount)。大神的思维跳跃太快,不看一下文档完全跟不上啊。下面来简单分析一下啊。

首先上面已经总结了线程池的状态有5种,4<5<8至少需要3位才能表示,所以就用3位表示吧。下面的代码就好理解了:

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

就是线程池的状态用高3位表示。在看一下:

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

CAPACITY的二进制表示为00011111 11111111 11111111 11111111
现在下面的代码也容易理解了:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

runStateOf和workerCountOf的参数都是ctl就是状态和数量组合的整形量。先看runStateOf方法。
CAPACITY是:
00011111 11111111 11111111 11111111
~CAPACITY是:
11100000 00000000 00000000 00000000
和~CAPACITY &就是把高3位取出来嘛。
同理和CAPACITY &就是把第29为取出来。
ctlOf方法就是使用|把runState和workCount组合起来。

线程池中几个重要的成员

BlockingQueue<Runnable> workQueue
workQueue是用于持有任务的队列

HashSet<Worker> workers
workers是包含线程池中所有工作线程worker的集合

volatile int corePoolSize
核心线程池大小,保持存活的工作线程的最小数目,当小于corePoolSize时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程

volatile int maximumPoolSize
线程池最大大小,也就是线程池中线程的最大数量

volatile long keepAliveTime
空闲线程等待工作的超时时间,即空闲线程存活时间

ThreadFactory threadFactory
线程工厂,通过Runnable创建Thread的工厂类

RejectedExecutionHandler handler
拒绝策略,加入线程池失败的时候的拒绝策略

execute方法

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查,线程池状态可能被改变
if (! isRunning(recheck) && remove(command))
reject(command);
//保证至少有一个Worker可以执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

在执行一个任务的时候主要分3步:

如果工作线程数量(workCount)还没有达到corePoolSize,那么就启动一个先的线程来执行新提交的任务。

否则,检查线程池的状态和把任务加入到任务队列中,如果线程池还在运行并且成功把任务加入到了任务队列中,再一次检查线程池是否还在运行。

如果不能把任务加入到队列中,那么有2中情况,一是:线程池关闭了,二是:任务队列满了。直接执行拒绝策略方法reject(command)

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker方法比较复杂,其实就做了一件事情,就是检查线程池是否关闭,线程数量有没有超过最大的容量,有没有超过corePoolSize,maximumPoolSize等相关条件,如果所用条件检查通过就创建一个Worker,并且启动这个Worker。

Executors

JUC的Executors是一个工厂类,这里主要看一下构造ThreadPoolExecutor的几个工厂方法,先看一下完整的ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

我们再一次通过ThreadPoolExecutor的execute简化流程来过一下各个参数的含义。在线程池(ThreadPoolExecutor)执行一个任务(Runnable)的时候,会先检查线程池的状态是否关闭,运行的线程的个数(workCount)是否小于corePoolSize,如果小于使用threadFactory来新创建一个线程来执行任务。如果大于corePoolSize则将任务(Runnable)加入到任务队列中,如果加入阻塞队列失败,则重新尝试把任务使用addWork来执行,主要是因为如果运行线程数小于maximumPoolSize,任务还是有机会被执行。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool是创建固定的线程来执行任务,从创建的的参数可以看到corePoolSize和maximumPoolSize一样大,并且使用的是一个无界阻塞队列LinkedBlockingQueue。新的任务如果超过了nThreads,那么就会被加入到无界阻塞队列中,因为是无界的,所以一定会被存在队列中,如果出现异常情况,因为corePoolSize和maximumPoolSize是一样大的,所以也没有机会开先的线程执行。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

这个工厂方法和上一个一样,只是指定了创建新线程的工厂类。

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor是newFixedThreadPool一个特例,只有一个线程来执行任务。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

这个方法和上一个方法一样,不过指定了创建线程时候的工厂类。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

再看newCachedThreadPool方法之前,我们先看一下SynchronousQueue。SynchronousQueue也是阻塞队列,不过队列没有任何内部容量,所以SynchronousQueue队列是获取数据和添加数据必须是配对的,前面的源码我们看到ThreadPoolExecutor使用的是offer的非阻塞方法,所以每一个任务都会创建一个新的线程来执行。

现在来看newCachedThreadPool方法,corePoolSize是0,maximumPoolSize是最大整数,线程的存活时间是60秒。也就是说使用newCachedThreadPool构造的线程池是这样一个线程池:每一个任务来要么加入SynchronousQueue队列中,要么创建一个线程来执行这个任务。没有任务的线程会存活60秒。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

这个方法和上一个方法一样,只是指定了创建线程的工厂类。

线程池的池体现在什么地方

我们先看Worker类:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker是ThreadPoolExecutor的成员内部类,Worker继承了AbstractQueuedSynchronizer自带锁特效,实现了Runnable接口。我们看到在Worker关联了一个Thread,这个就是执行Worker的Thread,在Worker的构造函数中我们看到创建一个Worker的时候就会创建一个Thread来执行Worker自身。Worker的run方法是调用的外部类ThreadPoolExecutor的runWorker方法。下面就来看一下ThreadPoolExecutor的runWorker方法。

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池停止了,或者当前线程被中断并且线程停止了并且Worker的线程没有被中断,就中断Worker的线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;//执行完将任务置空,下一次从任务队列中获取任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

runWorker就是线程复用的关键,runWorker是只在Worker的run方法中调用的,前面已经说了Worker关联了一个Thread来执行Worker自身,runWorker就是先执行Worker本身的任务firstTask,然后就不断的从任务队列中获取任务来执行,看while (task != null || (task = getTask()) != null)这条语句,getTask()是从任务队列中获取任务。

注意:把线程(Thread)和任务(Runnable)分开

总结

Worker实现了Runnable接口,并且组合了一个Thread,Thread是在Worker创建的时候使用ThreadPoolExecutor的线程工厂创建的,它就是为了执行Worker,Worker的run方法调用的是ThreadPoolExecutor的runWorker方法,这个方法就是先执行完Worker任务,然后不断从任务队列中获取任务执行,实现线程复用。

线程池执行任务的流程是抽象到了ThreadPoolExecutor的execute方法中,主要可以抽象为3步:
(1) 如果运行线程(workCount)小于corePoolSize,则创建新Worker来执行任务(一个Worker一个线程)
(2) 如果运行线程(workCount)大于corePoolSize,则加入任务队列
(3) 如果添加到任务队列失败,则还是尝试添加一个Worker,如果运行线程(workCount)小于maximumPoolSize则有机会创建一个新的Worker来执行任务,否则失败
(4) 如果第(3)步失败,则执行拒绝任务策略

下面是一个简化版的流程图

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息