您的位置:首页 > 编程语言 > Java开发

我之见--java 多线程 线程池ThreadPoolExecutor源码分析

2014-08-24 00:28 495 查看
前面我们对线程池使用已经有了一个很深的了解,什么是线程池?线程池有什么作用?线程池就是管理一组同构工作线程的资源池,线程池作用主要有两个方面,线程的管理,重复利用线程,避免重复创建线程。

我们先来看一下线程池相关的类图:



我们可以叫Executor做执行器,主要是execute(Runnable command)方法执行任务。

ExecutorService我们可以叫管理器,主要管理的生命周期,是不关闭,是否可以提交任务。

AbstractExecutorService 主要是对提交Callable进行封装成Runable。

ThreadPoolExecutor才是线程池的真正实现,下面我们来看一下源码的实现。

首先,我们看到内部类Worker的实现,Worker也就是我们经常说到的工作线程,每个Worker代表一个线程(相当于Thread);

private final class Worker extends AbstractQueuedSynchronizer


Worker实现AQS我们几乎可以猜到它是线程安全的,而且会使用内部锁

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
果然和我们猜的一样。

Worker主要的成员变成 Runnable firstTask也是线程要执行的任务。

下面我们来看一下线程池的构造函数:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
在这里我只是大概说一下参数是什么意思!具体更详细可以参考

线程池使用

corePoolSize:工作线程数。

maximumPoolSize:支持最大线程数。

keepAliveTime和TimeUnit unit 线程空闲后设置多少时间开始回收线程。

workQueue:当达到工作线程数之后保存任务的队列。

threadFactory:产生线程的工厂。

handler:异常处理。

下面我们直接从执行方法execute (Runnable command) 开始分析:

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}


ctl是当前工作线程数,如果当前线程数没有达到corePoolSize则添加工作线程;

我们接着看addWorker方法,

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
第一个for主要是检测是已经处于关闭状态。

第二个for 里面core如果为真:表示工作线程没有达到最大运行线程corePoolSize 假则是与maximumPoolSize比较,如果比较它们大,也就是说达到最大线程数,直接返回添加失败。如果没有则通过 break retry跳出两个for。如果能过cas检测发现有别的线程改变,则继续检测。

下面继续看addWorker下面的代码

Worker w = new Worker(firstTask);
Thread t = w.thread;

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);

if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}

workers.add(w);

int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}

t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();

return true;
1. Worker w = new Worker(firstTask);创建工作线程。

2. 获取锁,检测线程池是否已经关闭,如果没有则添加到工作队列中。

3. 工作线程开始执行t.start();

总结:到这里当线程没有达到工作线程数的时候,添加工作线程的过程我们分析完了。

我们接看下面的代码:

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);//<span style="background-color: rgb(255, 255, 255);"><span style="color:#ff0000;"> 1</span></span>
else if (workerCountOf(recheck) == 0)
addWorker(null, false);// <span style="color:#ff0000;">2</span>
}
else if (!addWorker(command, false)) //<span style="color:#ff0000;">4</span>
reject(command);// <span style="color:#ff0000;">3</span>
isRunning(c)方法表示线程池是否处于关闭状态。

检测如果没有关闭,则添加到阻塞队列,然后再次检测如果没有最大线程数,则添加工作线程 (上面2),如果发现线程池此时进入关闭,则进入异常处理 (上面1) ,如果第一次检测没有关闭,同时添加到阻塞队列失败(原因可以是队列满了)。则尝试添加新的工作线程,注意这里的第一个参数是false也则是它比较的maximumPoolSize这个线程数,如果也没有办法添加则调用异常处理。

下面我们看一下异常处理:

handler.rejectedExecution(command, this);主要调用rejectedExecution这里有可能不会抛出异常,取决你的异常处理策略。有可能会尝试添加到队列 。

下面我们来看一下带返回值的添加过程会有一些不同。

if (task == null) throw new NullPointerException();

RunnableFuture<Void> ftask = newTaskFor(task, null);

execute(ftask);

return ftask;

RunnableFuture是实现Runable接口和Future接口的类,所以它也是一个Runable,这里添加的时候只是封装成RunnableFuture,然后execute和上面的流程一样了,同时返回Future,以便获取执行的返回值。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: