您的位置:首页 > 编程语言 > Java开发

jdk 线程池 threadPoolExecutor 源码剖析

2016-07-05 11:18 337 查看
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }

 学习的过程首先是模仿的过程,然后才是创新,threadpoolExecutor是jdk自带的线程池,对比自己写的线程池,真是受益匪浅。

 

  继承者 java.util.concurrent.AbstractExecutorService
      继承者 java.util.concurrent.ThreadPoolExecutor
所有已实现的接口:
    Executor, ExecutorService
直接已知子类:
    ScheduledThreadPoolExecutor
  它是ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法
每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
核心和最大池大小
    ThreadPoolExecutor将根据corePoolSize(参见 getCorePoolSize())和 maximumPoolSize(参见 getMaximumPoolSize())
    设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,    则创建新线程来处理请求,即使有线程是空闲的。
    如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
    如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
    如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
    在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。 

 


    

ThreadPoolExecutor在java.util.concurrent中常用构造函数: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

 

 

 


    corePoolSize:线程池中常驻线程的数量;

    maximumPoolSize:线程池中所允许的最大的线程数。

    keepAliveTime:线程池中允许非常驻线程没有任务的最大存活时间。

    unit:时间单位。

    workQueue:缓存队列,相当于一个桥。

    

 

     

 


 

       一个任务就是通过execute方法加入到线程池中的,任务就是一个runnable的对象。 如果这个线程的数量大于线程池的量,且缓冲区未满,就加入到缓冲区中,如果缓冲区满了就创建新的进程。否则拒绝处理这个任务。

    

   
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }

 

   workers是一个hashset,方法主要是增加和返回他的第一个任务。

 

 

  

private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }

 

  创建和开始一个新的线程,当线程数小于corepoolsize。

 

 

 

 

  
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }

 

  类似 ,不解释。

 

 

 

private final class Worker implements Runnable { /** * The runLock is acquired and released surrounding each task * execution. It mainly protects against interrupts that are * intended to cancel the worker thread from instead * interrupting the task being run. */ private final ReentrantLock runLock = new ReentrantLock(); /** * Initial task to run before entering run loop. Possibly null. */ private Runnable firstTask; /** * Per thread completed task counter; accumulated * into completedTaskCount upon termination. */ volatile long completedTasks; /** * Thread this worker is running in. Acts as a final field, * but cannot be set until thread is created. */ Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } /** * Interrupts thread if not running a task. */ void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } /** * Interrupts thread even if running a task. */ void interruptNow() { thread.interrupt(); } /** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * Ensure that unless pool is stopping, this thread * does not have its interrupt set. This requires a * double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); /* * Track execution state to ensure that afterExecute * is called only if task completed or threw * exception. Otherwise, the caught runtime exception * will have been thrown by afterExecute itself, in * which case we don't want to call it again. */ boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Main run loop */ public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }

 

  内部类:worker类,主要是来容纳任务的,相当于来执行一个线程。

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: