您的位置:首页 > 编程语言 > Java开发

深入学习JDK 线程池(之六)

2014-04-02 22:23 393 查看
一、ScheduledThreadPoolExecutor类

        此类承载了SchedulerThreadPool,SingleThreadScheduledExecutor两种线程池的创建和功能实现的任务。

        1、内部类

        ScheduledFutureTask,实现RunnableScheduledFuture接口,该内部类再上一层的接口有Delayed,Future,Runnable等。从继承体系可知,该类适配了多线程(Runnable,Callable)的必要组件,并且实现Delayed接口完成延迟启动的功能。

        DelayedWorkQueue,直接装饰DelayQueue类,泛型对象为RunnableScheduledFuture,运行时实际装载的是内部类ScheduledFutureTask的实例对象。

        2、构造器:接受参数corePoolSize,BlockingQueue选用的是内部类DelayedWordQueue。

        3、主要方法分析,针对execute(),submit(),schedule()三个方法

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
schedule(command, 0, TimeUnit.NANOSECONDS);
}

// Override AbstractExecutorService methods

public Future<?> submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}

public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result),
0, TimeUnit.NANOSECONDS);
}

public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}

        如上所示代码可知:execute()和submit()方法最后全都转到schedule()方法中,可知schedule()方法是贯穿线程池执行的核心方法。

 

二、SchedulerThreadPool线程池流程分析

      1、 先看schedule()方法及相应调用方法的代码:

public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));

delayedExecute(t);
return t;
}

 

protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}


 

private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
// Prestart a thread if necessary. We cannot prestart it
// running the task because the task (probably) shouldn't be
// run yet, so thread will just idle until delay elapses.
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();

super.getQueue().add(command);
}


     

       1、由代码可知:先根据参数command,delay,timeunit得到一个ScheduledFutureTask对象,若是在execute()和submit()方法内调用schedule()的,此时delay值为0。

       2、decorateTask方法其实只是返回了ScheduledFutureTask对象,该对象中携带有延迟时间的(由delay和timeunit两个参数共同决定)。

       3、delayedExecute()方法内当poolSize<corePoolSize时,将进入prestartCoreThread(),该方法只是调用了父类(ThreadPoolExecutor)的addIfUnderCorePoolSize()方法,用于线程池的扩容,注意传入的Runnable是空的,即生成的worker对象里firstTask变量为null,目的是让worker对象调用run()方法循环监听时,阻塞在workQueue.take()这一行。说简单点就是扩容的时候,让线程池啥都不干,只是干等着。

       4、扩容操作完成后,运行到super.getQueue().add(command);代码行,此时会唤醒take()阻塞,完成worker对象内的线程对象执行。

 

       5、细心的同学可能会发现,delay延迟好像上文没有提到啊,究竟是如何实现延迟的呢?

       咱们想想这个线程池的BlockingQueue哪个类的实例?对,就是DelayedWordQueue容器实例,咱们看看这个容器的take()方法实现:

/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay =  first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;

}
}
}
} finally {
lock.unlock();
}
}

        就是这儿,前面讲的实现Delay接口,就是这时候起作用了,就是通过available.awaitNanos(delay);来实现延迟的。

        整个延迟实现的原理:ScheduledFutureTask实现了Delay接口,调用构造器时接收参数delay和Timeunit,转换成long值,在实现getDelay()方法时返回延迟的时间,当DelayedWorkQueue调用take()方法时,是会调用getDelay()方法进行awaitNanos(delay)的,所以,延迟的真正实现是交由DelayedQueue中的take()方法来实现的。

        6、scheduledAtFixedRate()方法分析

        该方法可以重复执行线程对象,跟scheduled()方法相比,多传了一个参数period,使ScheduledFutureTask类的成员变量period值不为0,方法isPeriodic()方法返回值就为true,要特别注意这个临界点,返回值为true后,在worker对象调用到run()方法时,就会进入runPeriodic()方法,该方法代码:

/**
* Runs a periodic task.
*/
private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task.  Wake up threads to check.
else if (down)
interruptIdleWorkers();
}

 

这个方法会干三件事情:

        1)、调用ScheduledFutureTask.super.runAndReset()方法,ScheduledFutureTask.super指的是FutureTask对象,runAndReset()方法会转到FutureTask的内部类Sync的innerRunAndReset()方法,如代码示:

boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}

 

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable  task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

         上面第一段代码表示在innerRunAndReset()方法内,线程对象会被执行,第二段代码是告诉大家内部类Sync做过小小地处理,构造器里已经把Runnable对象适配成Callable,目的就是通过一句callable.call()完成两种线程对象的执行。只需要记住调用该方法时,线程对象已经执行过一次了。

        2)、if条件判断第一步正常执行并且线程池未关闭时,若条件成立,则根据period参数对time成员变量进行累加,确定一下次任务的启动时间。

        3)、将本对象this又加入到super.getQueue列表中,此时time已更新,加到列表结果是该对象又会被执行(worker对象中有阻塞等待执行的线程,通过调用take()方法获取)。

        综上所述:第一步完成后,线程任务已经执行了一次,第二步更新启动时间,第三步的结果是线程任务又将等待执行一次,其实第三步是将起点重新回到第一步,只是time一直随着时间不停地累加,如此反复,该线程任务就一直重复执行。

 

        7、scheduleWithFixedDelay()方法

        跟scheduledAtFixedRate()方法类似,只是period是个负数,用于区分runPeriodic()方法的流程分支,time成员变量的计算方式不一样,在scheduleWithFixedRate中,time是以固定的period累加,而scheduleWithFixedDelay是当前时间加上period时间,只是这里有区别,其他的流程都是一样的。

 

三、SingleThreadScheduledExecutor线程池

       引用了静态内部类DelegateScheduledExecutorService,装饰ScheduledExecutorService,相当于corePoolSize为1的ScheduledThreadPool,特性分析见ScheduledThreadPool即可。

 

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息