您的位置:首页 > 编程语言 > Java开发

深入学习JDK 线程池(之四)

2014-03-09 14:31 369 查看
 

一、ThreadPoolExecutor类分析

       此类承载了CacheThreadPool,FixedThreadPool,SingleThreadExecutor三种线程池的创建和功能实现的任务。

       1、内部类:Worker,CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy,其中Worker作为私有内部类,承担了比较多的工作:接管线程池接收的Runnable对象的生命周期管理,调用线程池的回调方法,完成与线程池的通信。后面四个内部类,我在分析流程中还没有遇到,但也不能武断地对其下结论,所以等以后有机会做更深入的研究,再分析其设计价值吧。

       2、成员变量:先介绍几个用到的,其他的成员变量等用到时再作介绍吧。

             int runState 提供线程生命周期管理的状态值;ReentrantLock mainLock 对象锁,作用范围很广,在创建和回收线程时,都有用到;corePoolSize,maximumPoolSize,BlockingQueue等在前文已经介绍过了;HashSet<Worker> workers 线程池管理对象,它的容量决定线程池的容量,使用它之前,都必须先获得mainLock对象锁;

       3、主要方法分析,针对execute()和submit()

       ThreadPoolExecutor类中只有execute方法的实现,submit()方法的实现全在抽象类AbstractExecutorService中,看抽象类源码可知:

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}


里面用到的适配器RunnableFuture,将Callable接口适配成Runnable接口,然后全部调用execute(Runnable)方法。

所以各位看到这儿,应该明白了,ThreadPoolExecutor类的execute(Runnable),是线程池线程执行的核心方法!咱们只需要掌握这一个方法,剩下的三个,自然迎刃而解了。

 

二、CachedThreadPool线程池流程分析

       真是千呼万唤始出来,前面都是铺垫,现在终于进入正题了。

       先看看源码:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}


很简短的一段代码,不到十行,却包含了三种线程池的流程操作分支,只能说JDK源码,真的很值得学习。

我们先回顾下CachedThreadPool的创建参数:corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,BlockingQueue为SynchronousQueue<Runnable>实例。

由源码可知

1、poolSize >= corePoolSize永远为true,“||”操作符又带短路功能,所以CachedThreadPool线程池永远都不会执行addIfUnderCorePoolSize(command)这个方法。

2、workQueue.offer(command)在第一个线程任务时,返回的是false。这个是SynchronousQueue容器的特性,原文的描述为:you cannot insert an element unless thread is trying to remove it。具体可看JDK API描述,返回true的情况待会再说。

3、线程池第一个接收的任务,经过层层判断,最后是进入到addIfUnderMaximumPoolSize(command)方法中,咱们来看下该方法的源码:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

调用的addThread()方法咱们也顺便看一下:

private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}


 

咱们可以看出几处技术处理:

1)、maximumPoolSize的值为Integer.MAX_VALUE,poolSize < maximumPoolSize在绝大多数情况下是为true的,极端情况在现实系统中估计很难碰到,我们可以近似地认为:该判断永远返回true。

2)、addThread(firstTask)操作是加锁的,有同步锁控制。

3)、看addThread(firstTask)方法内部实现我们可以知道:该操作让线程池的容量增加了1。

4)、addThread(firstTask)是用内部类Worker来接收线程对象,即线程对象是交予Worker来管理的,再由ThreadFactory接收Worker创建一个Thread对象。

5)、线程池扩容完成后,返回线程对象(其实是Worker实例对象),调用了t.start(),启动了线程运行。

 

分析到这里,我们就可以证实之前CachedThreadPool的一个特性:“在程序执行过程中通常会创建与所需数量相同的线程”(在前文中有提到)

 

现在我们来看内部类Worker的实现,上文讲到调用了t.start()方法,也是就调用了Worker的run()方法,先看源码:

/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

注意此时firstTask是有Runnable对象的,它不为空(这点非常重要,涉及到回调函数的使用问题)

task != null 此时为true,同样的短路判断,整个while语句会进入代码行执行runTask(task)方法,看源码:

/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}


看runTask方法就会明白,它的目的就是要调用线程池接收的Runnable对象的run()方法,执行前后,都调用了回调方法beforeExecute,afterExecute,其实这两个回调函数都是空实现,嘿嘿,只是预留的接口而已。

      当这个方法执行完后,注意看run()方法的实现,task=null,在执行while里面的条件时,就会执行后半段条件,即进入回调函数getTask(),看源码:

Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN)  // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

 

咱们看出来的技术处理:

首先,这是个死循环;

其次,看这个判断可知:poolSize > corePoolSize || allowCoreThreadTimeOut,CachedThreadPool线程永远返回true,所以将进入

r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);代码行。

咱们查看SynchronousQueue的帮助文档可知:Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time, for another thread to insert it.

它在等别的线程插入对象,有没有印象?

对,就是前面的workQueue.offer(command),也就是说,这是workQueue.offer(command)返回true的情况:前面线程池内有线程执行完毕,挂起等待了,后面又加了一个线程任务过来,就直接拿出来执行了,然后继续等待,如此往复。这个其实就是在循环地利用线程池里的资源。所以在这种情况下,线程池就不会调用addIfUnderMaximumPoolSize(command)方法了,而是通过workQueue容器,将接收的线程任务交给已经创建好的线程池对象(Worker对象)去执行了。
 最后,我们发现poll是带时间参数的,即keepAliveTime,创建CachedThreadPool时,keepAliveTime是60秒,就是说,线程池对象在poll等待时,只会等60秒,超过这个时间,都没有接收到别的线程任务的话,将会返回null。

 

因此,咱们再看Worker的run()方法,此时while条件已经为false了,跳出该循环,将执行finally里的语句了,即workerDone方法,这又是一个回调函数,有什么作用呢?

是用来回收线程的。

我们再回过头来看CachedThreadPool承诺的另一个特性:“在它回收旧线程时停止创建新线程”,行,我们来看看它是如何实现承诺的,看源码:

/**
* Performs bookkeeping for an exiting worker thread.
* @param w the worker
*/
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}


看它的技术处理:

1、workers调用remove方法,移除掉一个worker对象。

2、操作是加锁处理的。

3、并且跟创建线程是同一个锁对象。

 

这个就是原因:创建和销毁共用一个同步锁,一次只能让一个操作执行,它就是通过同步锁来实现“回收旧线程时停止创建新线程”这个特性的。

 

以上就是CachedThreadPool的源码分析了,有理解得不到位的地方,还请各位多多评点。

 

接下来的一个章节,将会为大家介绍FixedThreadPool的实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息