您的位置:首页 > 编程语言 > Java开发

深入学习JDK 线程池(之七)

2014-04-12 16:46 302 查看
一、Submit流程

       前面几章节已经简单提及类AbstractExecutorService中的submit方法,现在我们来学习一下submit方法的设计及内部实现。

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}


 

       之前通过JDK API的介绍可知,submit主要是用来提交执行Callable线程对象,其实submit的重载方法也可以处理Runnable对象。

       由上面方法可知,Runnable和Callable对象都作为参数得到一个RunnableFuture接口实例对象(实例化的类为FutureTask),其中需要用到RunnableAdapter适配器类,将Runnable适配成Callable,因为FutureTask只接受Callable对象。如下面构造器代码:

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}

 

从构造器代码可知:内部类Sync只接受Callable对象,对Callable对象的处理实际上是交由Sync内部类。FutureTask的关键方法run()和get(),都是由Sync类的innerRun()和innerGet()来实现的,如代码所示:

void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}


 

阻塞式等待机制实现

        我们之前学习到,Future接口的get方法的执行具有阻塞性,它的实现原理就在innerGet()方法里,实现的过程有点复杂,我只是简单地讲解一下思路。acquireSharedInterruptibly(0)方法调用时,会调用FutureTask类的tryAcquireShared方法,该方法的一个作用是判断线程对象有没有执行完,若未执行完,会跳到AbstractQueuedSynchronizer类中的doAcquireSharedInterruptibly方法(中间的方法调用过程可查阅源码),该方法如下:

/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}

看源码可知,在方法的for循环里会不停地判断该线程任务有没有执行完,直到线程任务执行完毕,才能跳出该方法,大概就是通过这种思路去实现阻塞式等待功能的。

 

二、线程池Submit(Callable)应用场景

       线程池对象调用Submit(Callable)方法后,能返回一个Future对象,调用Future对象的get方法,可以阻塞式等call()方法执行完返回结果,现有两种使用场景:

       场景一:所有的Callable对象都submit后,用集合收集返回的Future对象,再遍历该集合,逐个调用get()方法,这样可以使多个callable线程并发执行,第一个调用get()方法的Future对象是最早提交的,等待的时间理论上会比较短,但如果callable比较多,会导致大量的线程进入阻塞状态。

       场景二:sumbit(callable)后,立即调用Future.get()方法,由于get()方法是阻塞式运行的,相当于没有并发运行任务。如果需要并发执行的效果,可以将submit这一环节设计成并发的,即在多线程对象里调用submit方法。可能看到这儿各位会不理解为什么要这样做,这样做唯一的目的就是防止大量的线程启动。注意是通过阻塞等待防止线程启动,跟线程池通过最大执行的线程控制还是有区别的,并且在有些需求环境下需要考虑用这种方式。

 

 写到这里,线程池这一部分的学习算告一段落,回想回想其实仍然有非常多的地方没有弄明白,估计是掌握的东西不够,有些东西只能揣摩,无法从本质上去理解。继续努力吧

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息