Java Thread&Concurrency(15): 深入理解ScheduledThreadPoolExecutor及其实现原理
2014-07-28 23:43
946 查看
接口ScheduledExecutorService通过继承ExecutorService提供了以预定的方式来执行任务的能力,可以用于代替java.util.Timer。
所提供的一个具体实现为ThreadPoolExecutor,另一方面ScheduledThreadPoolExecutor通过继承ThreadPoolExecutor从而复用了这个已经存在的线程框架,当然,为了更好地做到通过对任务制订时间来执行,还要必须拥有全新的任务队列以及任务单元,那么JDK对此的做法是如下三个组件:
ScheduledThreadPoolExecutor:一个实现ScheduledExecutorService的可以调度任务的执行框架。
DelayedWorkQueue:一个数组实现的阻塞队列,根据任务所提供的时间参数来调整位置,实际上就是个小根堆(优先队列)。
ScheduledFutureTask:任务单元,存有时间、周期、外部任务、堆下标等调度过程中必须用到的参数,被工作线程执行。
我们分别从源码来看上述三个方面的重点。
首先是ScheduledThreadPoolExecutor:
构造任务单元ScheduledFutureTask,scheduleAtFixedRate和scheduleWithFixedDelay的差别是间隔值一正一负。
如果是周期(periodic)的任务,则配置outerTask为被执行的任务单元。
延迟执行任务并返回Future(假如是周期任务,那么get正常情况下一直阻塞)。
然后我们接着看delayedExecute:
这里的工作如下:
探测执行器状态不是RUNNING,则拒绝任务。
否则增加任务至延迟队列,假如探测到状态不为RUNNING并且canRunInCurrentRunState返回false(说明任务该被取消),那么会在删除任务之后取消任务。canRunInCurrentRunState仅对周期和非周期任务有差别,非周期任务在SHUTDOWN状态下仍可被执行。
假如不是前面的情况,则调用ensurePrestart来驱动工作线程。
来看ensurePrestart:
这里的addWorker属于ThreadPoolExecutor的逻辑,不展开了。
延迟队列:
好了,关于ScheduledThreadPoolExecutor本身的叙述完了,我们接着来看关于延迟队列和任务单元,因为事实上工作线程会调用poll(long,TimeUnit)或者take()来执行任务,所以我们先从任务延迟队列开始(它和DelayQueue及其相似),重点是三个函数:
前面提交任务时,添加任务到队列的add函数:
我们来看下如何工作:
获取锁,取队列的根元素queue[0],它必定是时间上最近的元素。
查看是否可执行,可用的话就执行。
否则,根据leader是否为null查看是否是第一个在等待的工作线程,如果不是的话调用不限时阻塞await,否则将当前线程作为leader,并且调用第一个任务的延迟的限时阻塞awaitNanos(delay),这种情况下退出之前可能回修改leader为null。
最后一定会在finishPoll中返回任务单元,并且释放锁之前,当leader为null并且队列不为空,发送唤醒信号(主要是作为leader的线程取得任务之后用于唤醒其他阻塞工作线程)
我们来看finishPoll:
ScheduledFutureTask:
至此关于队列的主要操作已经说完了,我们接着来看任务单元,ScheduledFutureTask,从它覆盖了的run开始:
首先取得周期值,然后如果根据执行器状态来操作,可能取消任务。
否则,假如非周期任务则直接调用继承的run方法,执行任务。
否则任务为周期任务,那么会调用runAndReset来执行(特点是正常情况不会改变state),之后会重置时间(setNextRunTime),并且再度调度任务(reExecutePeriodic),使用的是outerTask即任务单元本身。
我们来看setNextRunTime:
我们再来看reExecutePeriodic:
我们最后来看看用于比较任务单元的compareTo:
至此,关于预计时间的调度任务完毕。
最后给出一个搞坏了的例子:
大家可以发现CPU一直被占有着,并且每次执行任务的线程都是全新的,原因就是延迟队列和keepAliveTime参数综合作用的结果(因为ThreadPoolExecutor的设计中,假如工作线程的阻塞超过了等待时间并且此刻工作线程数>1,则可以退出当前线程)。所以不要用这种方式。
所提供的一个具体实现为ThreadPoolExecutor,另一方面ScheduledThreadPoolExecutor通过继承ThreadPoolExecutor从而复用了这个已经存在的线程框架,当然,为了更好地做到通过对任务制订时间来执行,还要必须拥有全新的任务队列以及任务单元,那么JDK对此的做法是如下三个组件:
ScheduledThreadPoolExecutor:一个实现ScheduledExecutorService的可以调度任务的执行框架。
DelayedWorkQueue:一个数组实现的阻塞队列,根据任务所提供的时间参数来调整位置,实际上就是个小根堆(优先队列)。
ScheduledFutureTask:任务单元,存有时间、周期、外部任务、堆下标等调度过程中必须用到的参数,被工作线程执行。
我们分别从源码来看上述三个方面的重点。
首先是ScheduledThreadPoolExecutor:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }三段代码近乎类似:
构造任务单元ScheduledFutureTask,scheduleAtFixedRate和scheduleWithFixedDelay的差别是间隔值一正一负。
如果是周期(periodic)的任务,则配置outerTask为被执行的任务单元。
延迟执行任务并返回Future(假如是周期任务,那么get正常情况下一直阻塞)。
然后我们接着看delayedExecute:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }执行器的状态包含5个,分为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,初始为RUNNING。
这里的工作如下:
探测执行器状态不是RUNNING,则拒绝任务。
否则增加任务至延迟队列,假如探测到状态不为RUNNING并且canRunInCurrentRunState返回false(说明任务该被取消),那么会在删除任务之后取消任务。canRunInCurrentRunState仅对周期和非周期任务有差别,非周期任务在SHUTDOWN状态下仍可被执行。
假如不是前面的情况,则调用ensurePrestart来驱动工作线程。
来看ensurePrestart:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }较简单,根据工作线程数量来判断,然后调用限制核心数的true版本,或者在corePoolSize为0的情况下,至少开启一个工作线程。
这里的addWorker属于ThreadPoolExecutor的逻辑,不展开了。
延迟队列:
好了,关于ScheduledThreadPoolExecutor本身的叙述完了,我们接着来看关于延迟队列和任务单元,因为事实上工作线程会调用poll(long,TimeUnit)或者take()来执行任务,所以我们先从任务延迟队列开始(它和DelayQueue及其相似),重点是三个函数:
前面提交任务时,添加任务到队列的add函数:
public boolean add(Runnable e) { return offer(e); }
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }可以看出,这个优先队列的操作是基于数组和锁来实现,并且它是可并发的并且无容量限制的数据结构。这里的setIndex为任务单元配置下标heapIndex,siftUp操作调整至符合堆性质(时间最接近的在根)。这里的有趣地方是使用了一个leader引用,这么做可以更高效(后文再说)。
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }这里的poll和take及其相似,poll是一个限时阻塞(根据任务根元素和时间参数来选择限时参数),而take是一直阻塞(直到任务可用)。
我们来看下如何工作:
获取锁,取队列的根元素queue[0],它必定是时间上最近的元素。
查看是否可执行,可用的话就执行。
否则,根据leader是否为null查看是否是第一个在等待的工作线程,如果不是的话调用不限时阻塞await,否则将当前线程作为leader,并且调用第一个任务的延迟的限时阻塞awaitNanos(delay),这种情况下退出之前可能回修改leader为null。
最后一定会在finishPoll中返回任务单元,并且释放锁之前,当leader为null并且队列不为空,发送唤醒信号(主要是作为leader的线程取得任务之后用于唤醒其他阻塞工作线程)
我们来看finishPoll:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }工作原理较简单:取得根元素,调整堆大小以及拿最后一个元素来做置换,最后符合堆性质,这里的setIndex会给被取得任务一个-1的下标(heapIndex),代表它已经出队列。
ScheduledFutureTask:
至此关于队列的主要操作已经说完了,我们接着来看任务单元,ScheduledFutureTask,从它覆盖了的run开始:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
首先取得周期值,然后如果根据执行器状态来操作,可能取消任务。
否则,假如非周期任务则直接调用继承的run方法,执行任务。
否则任务为周期任务,那么会调用runAndReset来执行(特点是正常情况不会改变state),之后会重置时间(setNextRunTime),并且再度调度任务(reExecutePeriodic),使用的是outerTask即任务单元本身。
我们来看setNextRunTime:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }这里的工作很简单,根据(p>0则为固定时间周期任务)和(p<=0为固定延迟周期任务),来重新调整时间,可以看到延迟类型的周期任务,会在triggerTime中取得当前时间来得到任务,所以它是依赖当前时间的。
我们再来看reExecutePeriodic:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }这里的工作同样简单,在可以执行任务的情况下,添加任务单元到延迟队列,之后取消任务,或者再次驱动工作线程。
我们最后来看看用于比较任务单元的compareTo:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }可以看出基本上是根据time参数来比较的,否则会根据sequenceNumber来比较,而sequenceNumber由AtomicLong生成。
至此,关于预计时间的调度任务完毕。
最后给出一个搞坏了的例子:
public class TestScheduledThreadPoolExecutor { public static void main(String[] args) { // TODO Auto-generated method stub ScheduledExecutorService service = Executors.newScheduledThreadPool(2); if(service instanceof ThreadPoolExecutor){ ((ThreadPoolExecutor)service).setKeepAliveTime(1, TimeUnit.NANOSECONDS); ((ThreadPoolExecutor)service).allowCoreThreadTimeOut(true); } Future<?> future = service.scheduleAtFixedRate(new Runnable(){ public void run(){ System.out.println("Result!" + Thread.currentThread().getName()); } }, 1, 1, TimeUnit.SECONDS); } }
大家可以发现CPU一直被占有着,并且每次执行任务的线程都是全新的,原因就是延迟队列和keepAliveTime参数综合作用的结果(因为ThreadPoolExecutor的设计中,假如工作线程的阻塞超过了等待时间并且此刻工作线程数>1,则可以退出当前线程)。所以不要用这种方式。
相关文章推荐
- Java Thread&Concurrency(13): 深入理解ConcurrentLinkedQueue及其实现原理
- Java Thread&Concurrency(12): 深入理解AbstractExecutorService及其实现原理
- Java Thread&Concurrency(16): 深入理解ArrayBlockingQueue及其实现原理
- Java Thread&Concurrency(11): 深入理解ThreadPoolExecutor及其实现原理
- Java Thread&Concurrency(14): 深入理解条件队列(Condition)及其实现原理
- Java Thread&Concurrency(9): 深入理解StampedLock及其实现原理
- Java Thread&Concurrency(10): 深入理解ThreadLocal及其实现原理
- Java Thread&Concurrency(4): 深入理解Exchanger实现原理
- Java Thread&Concurrency(8): 深入理解CompletionService接口及其实现
- Java Thread&Concurrency(7): 深入理解Callable/Future(FutureTask)接口及其实现
- Java Thread&Concurrency(5): 深入理解Phaser实现原理
- Java Thread&Concurrency(2): 深入理解ConcurrentSkipListMap实现原理
- Java Thread&Concurrency(1): 深入理解Fork-Join并发执行框架
- Java Thread&Concurrency(3): 深入理解SynchronousQueue实现原理
- java 中HashMap实现原理深入理解
- 深入Java集合学习系列:ConcurrentLinkedQueue及其实现原理
- 深入理解Java中的HashMap的实现原理
- 深入理解Java中的HashMap的实现原理
- 【深入理解java集合系列】ArrayList实现原理
- 深入理解StampedLock及其实现原理