您的位置:首页 > 编程语言 > Java开发

Java线程池(2)

2016-02-23 19:46 423 查看
在上篇博文中,我描述了Java线程池的大概执行流程。但是由于我们平时使用线程池的时候,一般都不会自己new ThreadPoolExecutor(),而是通过类似于 Executors.newCachedThreadPool()这样的方法来构建一个线程池。下面我们就来看下Executors.newCachedThreadPool()到底构建了一个什么样的线程池。

直接看代码:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


从代码中可以看到,改方法构造了一个coreSize为0, maxSize为Integer.MAX_VALUE的线程池。上篇博文中,我简单描述了当往一个线程池中添加一个task时,线程池的处理过程大概如下:

(1) 如果当前线程池中活动线程的数目还未达到coreSize, 则新构建一个线程来处理这个task;

(2) 如果线程中活跃线程的数目已经达到了coreSize, 则将该task添加到线程池的队列当中;

(3) 如果添加失败(例如队列满了),但是活跃线程的数目还未达到maxSize,则新建一个线程来处理这个task;

(4) 如果队列满了,且活跃线程的数目已经达到了maxSize,则拒绝执行,抛异常。

从上叙的流程中,直觉来看,我们发现其实Executors.newCachedThreadPool()构建的这个线程池是可能是有问题的: 因为coreSize=0, 所以我哪怕提交一个任务给改线程池,它也应该会把他给放到队列里面缓存起来,对吧。这样的话,假如我只提交一个任务,然后后续就不提交了,那这个task岂不是永远都不会执行?

当然 Executors.newCachedThreadPool()构造的这个线程池是个应用非常广泛的线程池,不可能会存在这样简单的问题。那它为什么又不会出现上叙的那个问题的呢?

其实从new SynchronousQueue()这里面,我们就可以发现特殊的地方,因为一般的阻塞队列都会有个capacity存在,否则当生成者生产的数据往里面放时,永远不会阻塞,那它有什么资格叫阻塞队列?哈哈,这是玩笑话。

其实SynchronousQueue我们可以把他理解为capacity为0的队列,它有如下的特性:

void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素.

boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或 者”碰巧”有poll操作,那么将返回true,否则返回false.

E take():获取并删除一个元素,阻塞直到有其他线程offer/put.

E peek():总会返回null,硬编码.

更多的特性请参考这篇文章.

好了,了解了SynchronousQueue那么就比较好解释这个线程池是如何工作的了。还是结合着代码看:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}


首先,在初始化的时候,线程池中肯定是空的,此时,当提交一个task的时候,就会执行workQueue.offer(command),即尝试把任务添加到队列中,此时必然会返回false, 因为此时并没有线程从该队列中take()任务。所以此次程序会执行addIfUnderMaximumPoolSize(command),即新创建一个线程来处理该任务。由于线程池的maxSize为Integer.MAX_VALUE,几乎可以说是无穷大,所以向该线程池提交任务基本上不会失败。

那么是不是每次向这个线程池中提交任务它都重新创建一个线程呢,当然不是,否则就达不到我们使用线程池的目的了。看下面的代码:

if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);


线程池中每个创建的线程在执行完自己手头上的task之后,都会再从队列中再poll()一个任务,如果此时一直没有新的task提交过来,那么take()方法会阻塞60s(默认), 即线程的存活时间。如果60s还没poll到,则线程进入销毁模式。如果在这60s poll()的过程中正好有个task提交,此时该线程就直接执行它。就达到了线程的复用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 线程池 class