您的位置:首页 > 编程语言 > Java开发

Java Thread&Concurrency(15): 深入理解ScheduledThreadPoolExecutor及其实现原理

2014-07-28 23:43 946 查看
接口ScheduledExecutorService通过继承ExecutorService提供了以预定的方式来执行任务的能力,可以用于代替java.util.Timer。

所提供的一个具体实现为ThreadPoolExecutor,另一方面ScheduledThreadPoolExecutor通过继承ThreadPoolExecutor从而复用了这个已经存在的线程框架,当然,为了更好地做到通过对任务制订时间来执行,还要必须拥有全新的任务队列以及任务单元,那么JDK对此的做法是如下三个组件:

ScheduledThreadPoolExecutor:一个实现ScheduledExecutorService的可以调度任务的执行框架。

DelayedWorkQueue:一个数组实现的阻塞队列,根据任务所提供的时间参数来调整位置,实际上就是个小根堆(优先队列)。

ScheduledFutureTask:任务单元,存有时间、周期、外部任务、堆下标等调度过程中必须用到的参数,被工作线程执行。

我们分别从源码来看上述三个方面的重点。

首先是ScheduledThreadPoolExecutor:
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
三段代码近乎类似:

构造任务单元ScheduledFutureTask,scheduleAtFixedRate和scheduleWithFixedDelay的差别是间隔值一正一负。
如果是周期(periodic)的任务,则配置outerTask为被执行的任务单元。
延迟执行任务并返回Future(假如是周期任务,那么get正常情况下一直阻塞)。

然后我们接着看delayedExecute:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
执行器的状态包含5个,分为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,初始为RUNNING。
这里的工作如下:

探测执行器状态不是RUNNING,则拒绝任务。
否则增加任务至延迟队列,假如探测到状态不为RUNNING并且canRunInCurrentRunState返回false(说明任务该被取消),那么会在删除任务之后取消任务。canRunInCurrentRunState仅对周期和非周期任务有差别,非周期任务在SHUTDOWN状态下仍可被执行。
假如不是前面的情况,则调用ensurePrestart来驱动工作线程。

来看ensurePrestart:

void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
较简单,根据工作线程数量来判断,然后调用限制核心数的true版本,或者在corePoolSize为0的情况下,至少开启一个工作线程。
这里的addWorker属于ThreadPoolExecutor的逻辑,不展开了。

延迟队列:

好了,关于ScheduledThreadPoolExecutor本身的叙述完了,我们接着来看关于延迟队列和任务单元,因为事实上工作线程会调用poll(long,TimeUnit)或者take()来执行任务,所以我们先从任务延迟队列开始(它和DelayQueue及其相似),重点是三个函数:
前面提交任务时,添加任务到队列的add函数:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
可以看出,这个优先队列的操作是基于数组和锁来实现,并且它是可并发的并且无容量限制的数据结构。这里的setIndex为任务单元配置下标heapIndex,siftUp操作调整至符合堆性质(时间最接近的在根)。这里的有趣地方是使用了一个leader引用,这么做可以更高效(后文再说)。
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这里的poll和take及其相似,poll是一个限时阻塞(根据任务根元素和时间参数来选择限时参数),而take是一直阻塞(直到任务可用)。
我们来看下如何工作:

获取锁,取队列的根元素queue[0],它必定是时间上最近的元素。
查看是否可执行,可用的话就执行。
否则,根据leader是否为null查看是否是第一个在等待的工作线程,如果不是的话调用不限时阻塞await,否则将当前线程作为leader,并且调用第一个任务的延迟的限时阻塞awaitNanos(delay),这种情况下退出之前可能回修改leader为null。
最后一定会在finishPoll中返回任务单元,并且释放锁之前,当leader为null并且队列不为空,发送唤醒信号(主要是作为leader的线程取得任务之后用于唤醒其他阻塞工作线程)

我们来看finishPoll:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
工作原理较简单:取得根元素,调整堆大小以及拿最后一个元素来做置换,最后符合堆性质,这里的setIndex会给被取得任务一个-1的下标(heapIndex),代表它已经出队列。

ScheduledFutureTask:

至此关于队列的主要操作已经说完了,我们接着来看任务单元,ScheduledFutureTask,从它覆盖了的run开始:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}


首先取得周期值,然后如果根据执行器状态来操作,可能取消任务。
否则,假如非周期任务则直接调用继承的run方法,执行任务。
否则任务为周期任务,那么会调用runAndReset来执行(特点是正常情况不会改变state),之后会重置时间(setNextRunTime),并且再度调度任务(reExecutePeriodic),使用的是outerTask即任务单元本身。

我们来看setNextRunTime:
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
这里的工作很简单,根据(p>0则为固定时间周期任务)和(p<=0为固定延迟周期任务),来重新调整时间,可以看到延迟类型的周期任务,会在triggerTime中取得当前时间来得到任务,所以它是依赖当前时间的。

我们再来看reExecutePeriodic:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
这里的工作同样简单,在可以执行任务的情况下,添加任务单元到延迟队列,之后取消任务,或者再次驱动工作线程。

我们最后来看看用于比较任务单元的compareTo:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
可以看出基本上是根据time参数来比较的,否则会根据sequenceNumber来比较,而sequenceNumber由AtomicLong生成。

至此,关于预计时间的调度任务完毕。

最后给出一个搞坏了的例子:

public class TestScheduledThreadPoolExecutor {

public static void main(String[] args) {
// TODO Auto-generated method stub
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
if(service instanceof ThreadPoolExecutor){
((ThreadPoolExecutor)service).setKeepAliveTime(1, TimeUnit.NANOSECONDS);
((ThreadPoolExecutor)service).allowCoreThreadTimeOut(true);

}
Future<?> future = service.scheduleAtFixedRate(new Runnable(){
public void run(){
System.out.println("Result!" + Thread.currentThread().getName());
}
}, 1, 1, TimeUnit.SECONDS);
}

}


大家可以发现CPU一直被占有着,并且每次执行任务的线程都是全新的,原因就是延迟队列和keepAliveTime参数综合作用的结果(因为ThreadPoolExecutor的设计中,假如工作线程的阻塞超过了等待时间并且此刻工作线程数>1,则可以退出当前线程)。所以不要用这种方式。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: