您的位置:首页 > 产品设计 > UI/UE

jetty源码分析:QueuedThreadPool

2017-02-19 21:30 399 查看

概括

我们从几个问题入手,如果搞清楚这几个问题,也就理解了这个线程池的原理了

线程池中的线程是否固定的?

答: 不是的,线程池中的线程数量随着请求量的变化而动态变化,不过一直保持在_minThreads到_maxThreads之间。

线程什么时候新增?什么时候会退出?

答: 当请求过来,但是又没有空闲的线程,那么就会新增,当然这个也收_maxThreads限制。退出有两种情况:一是线程被中断了,另外一种设置了超时退出时间并且时间真的到了。

重要的成员

线程池常规成员:

private int _idleTimeout;                                //线程空闲多久后退出
private int _maxThreads;                                 //最大线程数
private int _minThreads;                                 //最小线程数
private final BlockingQueue<Runnable> _jobs;                                      //任务队列,提交的任务都先放到这里来
private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<>();       //线程池中所有线程


构造函数

线程池启动

线程池通过doStart函数启动

protected void doStart() throws Exception
{
super.doStart();
_threadsStarted.set(0);

startThreads(_minThreads);
}


代码比较简单,先调用父类的doStart,然后把线程数量设置为0,同时通过startThreads函数启动了_minThreads个线程,我们来看下这个线程启动函数

private boolean startThreads(int threadsToStart)
{
while (threadsToStart > 0 && isRunning())
{
int threads = _threadsStarted.get();
if (threads >= _maxThreads)
return false;

if (!_threadsStarted.compareAndSet(threads, threads + 1))
continue;

boolean started = false;
try
{
Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
_threads.add(thread);

thread.start();
started = true;
--threadsToStart;   //启动一个,任务少了一个
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
}
return true;
}


这个函数的功能是批量启动线程,所以里面有个循环的,判断创建的线程数量是否到达了目标数(当然线程池一定是要启动着的)。 具体创建一个线程流程如下:

1. 需要判断当前的线程数量是否已经超过最大值,如果是就直接返回失败,否则总线程数+1

2 已启动线程+1,因为这里的实现中,每个线程都可以再创建线程的,所以这里需要原子操作

3 通过newThread新建一个线程,并启动,然后放到_threads。如果创建成功,需要创建线程-1,如果创建失败,那么总线程数量减-1

上面创建一个线程时候,我们它设置需要干的活,就是_runnable,我们进去看下它在干什么?

private Runnable _runnable = new Runnable()
{
@Override
public void run()
{
boolean shrink = false;
boolean ignore = false;
try
{
Runnable job = _jobs.poll();

if (job != null && _threadsIdle.get() == 0)    //队列中有任务,但是空闲的线程为0,那么需要启动一个线程
{
startThreads(1);
}

loop: while (isRunning())
{
// Job loop
while (job != null && isRunning())
{
if (LOG.isDebugEnabled())
LOG.debug("run {}",job);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {}",job);
if (Thread.interrupted())
{
ignore=true;
break loop;
}
job = _jobs.poll();   //继续拿一个任务,进行执行
}

// Idle loop
try
{
_threadsIdle.incrementAndGet();

while (isRunning() && job == null)
{
if (_idleTimeout <= 0)      //如果没有设置超时退出时间
job = _jobs.take();     //阻塞线程等待有新的任务
else      //判断是否需要退出线程
{
// maybe we should shrink?
final int size = _threadsStarted.get();
if (size > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
{
shrink=true;
break loop;
}
}
}
job = idleJobPoll();    //如果这个线程不退出,那么继续尝试获取新任务
}
}
}
finally
{
if (_threadsIdle.decrementAndGet() == 0)   //空闲的线程为0了,那么创建一个
{
startThreads(1);
}
}
}
}
catch (InterruptedException e)
{
ignore=true;
LOG.ignore(e);
}
catch (Throwable e)
{
LOG.warn(e);
}
finally
{
if (!shrink && isRunning())
{
if (!ignore)
LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
// This is an unexpected thread death!
if (_threadsStarted.decrementAndGet()<getMaxThreads())
startThreads(1);
}
_threads.remove(Thread.currentThread());
}
}
};


这个代码有将近100行,但是其实内容也不多,可以分为三块:

1 首先从任务队列中拿到一个任务,如果有需要执行的任务,并且空闲的线程为0,通过startThreads启动一个线程(当然启动线程可能失败情况)

2 任务循环:如果1中获取了任务,那么通过runjob把任务启动起来,运行后通过interrupted()查看当前线程是否被interrupted,如果是,那么就退出线程。否则从队列中继续获取一个任务出来。

3 空闲处理:首先进入到这里,就是一个任务执行完成了,那么对应的我们把空闲的线程数量+1,然后进行一个循环,但是有条件:线程池服务还在且任务队列中没有任务了,执行下面的逻辑:首先看是否失职超时退出,如果没有设置这个,也就是线程是永远不能退出的,那我们就让线程阻塞在队列中,一直等待到有新的任务出现;否则的话,就可能需要把当前的线程给退出了,我们先看当前总的线程数是否大于_minThreads(否则太少我们也不会让当前线程退出的),如果是的且好久没有线程退出了( 当前时间距离上次有线程退出已经超过阈值了),那么就需要把当前线程退出并且是否当前的时间为最后退出时间。 最后线程不满足退出条件,那么再尝试去获取任务

4. 正常情况,如果2和3中的线程不退出,那么一直循环执行新的任务(没有任务时候就阻塞在任务队列上)。但是如果2和3中退出来了,如果是2导致的(被中断的),那么看下线程数是否太少了,如果是,继续启动一个线程。最后不管是什么原因导致的,当前线程是要退出的,所有把线程从线程队列中(_threads)删除。

任务接受

当一个任务提交到线程池的时候,调用了execute函数,我们从这个入口进行分析

@Override
public void execute(Runnable job)
{
if (LOG.isDebugEnabled())
LOG.debug("queue {}",job);
if (!isRunning() || !_jobs.offer(job))
{
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
startThreads(1);
}
}


代码很简单,首先看线程池是否在运行,没有运行就直接抛出异常,否则就往任务队列中添加一个任务,同样添加失败也会抛异常(任务队列满了),然后看下当前的线程池中的线程是不是空的,如果空的,通过startThreads启动一个线程(这个上面已经介绍过了,直接看上面就可以了)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: