jetty源码分析:QueuedThreadPool
2017-02-19 21:30
399 查看
概括
我们从几个问题入手,如果搞清楚这几个问题,也就理解了这个线程池的原理了线程池中的线程是否固定的?
答: 不是的,线程池中的线程数量随着请求量的变化而动态变化,不过一直保持在_minThreads到_maxThreads之间。
线程什么时候新增?什么时候会退出?
答: 当请求过来,但是又没有空闲的线程,那么就会新增,当然这个也收_maxThreads限制。退出有两种情况:一是线程被中断了,另外一种设置了超时退出时间并且时间真的到了。
重要的成员
线程池常规成员:private int _idleTimeout; //线程空闲多久后退出 private int _maxThreads; //最大线程数 private int _minThreads; //最小线程数 private final BlockingQueue<Runnable> _jobs; //任务队列,提交的任务都先放到这里来 private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<>(); //线程池中所有线程
构造函数
线程池启动
线程池通过doStart函数启动protected void doStart() throws Exception { super.doStart(); _threadsStarted.set(0); startThreads(_minThreads); }
代码比较简单,先调用父类的doStart,然后把线程数量设置为0,同时通过startThreads函数启动了_minThreads个线程,我们来看下这个线程启动函数
private boolean startThreads(int threadsToStart) { while (threadsToStart > 0 && isRunning()) { int threads = _threadsStarted.get(); if (threads >= _maxThreads) return false; if (!_threadsStarted.compareAndSet(threads, threads + 1)) continue; boolean started = false; try { Thread thread = newThread(_runnable); thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); thread.setName(_name + "-" + thread.getId()); _threads.add(thread); thread.start(); started = true; --threadsToStart; //启动一个,任务少了一个 } finally { if (!started) _threadsStarted.decrementAndGet(); } } return true; }
这个函数的功能是批量启动线程,所以里面有个循环的,判断创建的线程数量是否到达了目标数(当然线程池一定是要启动着的)。 具体创建一个线程流程如下:
1. 需要判断当前的线程数量是否已经超过最大值,如果是就直接返回失败,否则总线程数+1
2 已启动线程+1,因为这里的实现中,每个线程都可以再创建线程的,所以这里需要原子操作
3 通过newThread新建一个线程,并启动,然后放到_threads。如果创建成功,需要创建线程-1,如果创建失败,那么总线程数量减-1
上面创建一个线程时候,我们它设置需要干的活,就是_runnable,我们进去看下它在干什么?
private Runnable _runnable = new Runnable() { @Override public void run() { boolean shrink = false; boolean ignore = false; try { Runnable job = _jobs.poll(); if (job != null && _threadsIdle.get() == 0) //队列中有任务,但是空闲的线程为0,那么需要启动一个线程 { startThreads(1); } loop: while (isRunning()) { // Job loop while (job != null && isRunning()) { if (LOG.isDebugEnabled()) LOG.debug("run {}",job); runJob(job); if (LOG.isDebugEnabled()) LOG.debug("ran {}",job); if (Thread.interrupted()) { ignore=true; break loop; } job = _jobs.poll(); //继续拿一个任务,进行执行 } // Idle loop try { _threadsIdle.incrementAndGet(); while (isRunning() && job == null) { if (_idleTimeout <= 0) //如果没有设置超时退出时间 job = _jobs.take(); //阻塞线程等待有新的任务 else //判断是否需要退出线程 { // maybe we should shrink? final int size = _threadsStarted.get(); if (size > _minThreads) { long last = _lastShrink.get(); long now = System.nanoTime(); if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) { if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1)) { shrink=true; break loop; } } } job = idleJobPoll(); //如果这个线程不退出,那么继续尝试获取新任务 } } } finally { if (_threadsIdle.decrementAndGet() == 0) //空闲的线程为0了,那么创建一个 { startThreads(1); } } } } catch (InterruptedException e) { ignore=true; LOG.ignore(e); } catch (Throwable e) { LOG.warn(e); } finally { if (!shrink && isRunning()) { if (!ignore) LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this); // This is an unexpected thread death! if (_threadsStarted.decrementAndGet()<getMaxThreads()) startThreads(1); } _threads.remove(Thread.currentThread()); } } };
这个代码有将近100行,但是其实内容也不多,可以分为三块:
1 首先从任务队列中拿到一个任务,如果有需要执行的任务,并且空闲的线程为0,通过startThreads启动一个线程(当然启动线程可能失败情况)
2 任务循环:如果1中获取了任务,那么通过runjob把任务启动起来,运行后通过interrupted()查看当前线程是否被interrupted,如果是,那么就退出线程。否则从队列中继续获取一个任务出来。
3 空闲处理:首先进入到这里,就是一个任务执行完成了,那么对应的我们把空闲的线程数量+1,然后进行一个循环,但是有条件:线程池服务还在且任务队列中没有任务了,执行下面的逻辑:首先看是否失职超时退出,如果没有设置这个,也就是线程是永远不能退出的,那我们就让线程阻塞在队列中,一直等待到有新的任务出现;否则的话,就可能需要把当前的线程给退出了,我们先看当前总的线程数是否大于_minThreads(否则太少我们也不会让当前线程退出的),如果是的且好久没有线程退出了( 当前时间距离上次有线程退出已经超过阈值了),那么就需要把当前线程退出并且是否当前的时间为最后退出时间。 最后线程不满足退出条件,那么再尝试去获取任务
4. 正常情况,如果2和3中的线程不退出,那么一直循环执行新的任务(没有任务时候就阻塞在任务队列上)。但是如果2和3中退出来了,如果是2导致的(被中断的),那么看下线程数是否太少了,如果是,继续启动一个线程。最后不管是什么原因导致的,当前线程是要退出的,所有把线程从线程队列中(_threads)删除。
任务接受
当一个任务提交到线程池的时候,调用了execute函数,我们从这个入口进行分析@Override public void execute(Runnable job) { if (LOG.isDebugEnabled()) LOG.debug("queue {}",job); if (!isRunning() || !_jobs.offer(job)) { LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } else { // Make sure there is at least one thread executing the job. if (getThreads() == 0) startThreads(1); } }
代码很简单,首先看线程池是否在运行,没有运行就直接抛出异常,否则就往任务队列中添加一个任务,同样添加失败也会抛异常(任务队列满了),然后看下当前的线程池中的线程是不是空的,如果空的,通过startThreads启动一个线程(这个上面已经介绍过了,直接看上面就可以了)
相关文章推荐
- Jetty源码分析之线程池:QueuedThreadPool
- 线程池--jetty中QueuedThreadPool分析(一)
- muduo源码分析:线程池类ThreadPool
- cartographer源码分析(10)-common-thread_pool.h
- Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装
- mariadb 5.5 threadpool 源码分析
- Jetty的线程池实现QueuedThreadPool
- Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装
- python threadpool 源码分析以及自己封装的简易版线程池
- Java多线程 -- JUC包源码分析15 -- SynchronousQueue与CachedThreadPool
- jetty的线程池实现QueuedThreadPool
- Jetty 源码分析
- nginx源码分析—内存池结构ngx_pool_t及内存管理
- Jetty 源码分析
- Nginx 源码分析-- 内存池(pool)的分析 三
- Nginx 源码分析-- 内存池(pool)的分析 二
- nginx源码分析—内存池结构ngx_pool_t及内存管理
- AbstractQueuedSynchronizer、ReentrantLock源码分析——从未曾了解到精通原理
- jdk源码分析 – Thread线程类源码分析
- Object源码, monitor (wait pool) , Thread.sleep(), interrupt