您的位置:首页 > 编程语言 > Java开发

Java --- Future

2015-10-09 16:13 441 查看
Java --- Future

JDK - Futrue

JDK定义:Future代表着一个异步计算的结果,提供了检查异步计算是否完成,等待异步计算完成,获取计算结果等方法。换句话说:提交了一个计算后,需要一个接口来获取计算结果或确认计算是否完成,这个接口就是Future。

从定义上来看,Future是和一个计算绑定在一起的,因此很自然的引申出以下几个接口及实现类:

RunnableFuture<V> :代表一个可运行的Future.

RunnableScheduledFuture<V>:代表一个可以延迟运行的Future

FutureTask<V>:RunnableFuture的实现类。

FutureTask使用示例代码:

FutureTask<Long> future =
new FutureTask<Long>(new Callable<Long>(){

@Override
public Long call()
throws Exception {
Thread.sleep(10000);
return 1000L;
}
});

Executors.newSingleThreadExecutor().execute(future);

try {
System.out.println(future.get());
} catch (InterruptedException
e) {
//
TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException
e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}

这里创建了一个FutureTask提交给一个Executor去执行后就调用Future的get方法获取结果,get会阻塞直到FutureTask的运行任务执行完成后返回结果。

实际上java.util.concurrent.ExecutorService接口定义的向其提交任务的方法,如submit,invoke等都会返回一个Future对象供调用者来获取异步操作的执行结果,所以使用中其实并不需要开发人员自己实例化Future对象的。

<T> Future<T> submit(Callable<T> task);

……

下面看下java.util.concurrent.AbstractExecutorService的代码:

public <T> Future<T> submit(Runnable
task, T
result) {
if (task ==
null)
thrownew NullPointerException();
RunnableFuture<T>
ftask = newTaskFor(task,
result);
execute(ftask);
returnftask;
}

protected <T>
RunnableFuture<T> newTaskFor(Callable<T>
callable) {
returnnew FutureTask<T>(callable);
}

可以看到AbstractExecutorService会实例并返回一个FutureTask对象供调用者使用。

看下FutureTask的代码

publicvoid
run() {
if (state !=
NEW ||
!UNSAFE.compareAndSwapObject(this,
runnerOffset,

null, Thread.currentThread()))
return;
try {
Callable<V>
c = callable;
if (c !=
null &&
state ==
NEW) {
V
result;
booleanran;
try {
result =
c.call();
ran =
true;
} catch (Throwable
ex) {
result =
null;
ran =
false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner =
null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
ints =
state;
if (s >=
INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protectedvoid set(V
v) {
if (UNSAFE.compareAndSwapInt(this,
stateOffset,
NEW,
COMPLETING)) {
outcome =
v;
UNSAFE.putOrderedInt(this,
stateOffset,
NORMAL);
// final state
finishCompletion();
}
}
public V
get() throws InterruptedException, ExecutionException {
ints =
state;
if (s <=
COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

就可以清楚的看到FutureTask会维持任务的执行状态,当异步计算执行完后,执行线程会调用set方法来设置状态和结果。

Netty的Future和Promise

Netty 中提供了很多有用的工具,本身也提倡说可以将Netty作为一个基础类库来使用而不是仅仅局限于网络应用。这其中也包括了Future。用Netty官方文档的话来说,就是异步计算更应该是当计算结果是触发某个操作,而不是去阻塞(当然也可以不阻塞,但总是需要调用者去主动查询)来查询计算的结果。这就是Netty Future接口的目的。

从方法列表可以看出:除了在获取结果和等待方面添加了一些有用方法外,最主要的是引入了FutureListener的概念,包括增加和删除FutureListener的方法。这些Listener也就是在异步计算的特定生命周期时刻触发的操作。

Netty Promise继承自自己的Future,是在Future可以出发监听者操作之外再增加了可以用来控制异步计算的接口。

看到增加了setSuccess,trySucess,setFailure,tryFailuer几个可以干预异步计算的方法。

看下Netty Future和Promise在Netty中的应用。下面是ServerBootStrapAcceptor类的channelRead方法,该类负责客户端和服务器建立起来的连接注册到一个EventLoopGroup。这里的注册是一个异步操作,具体注册在childGroup中的某个EventLoop来执行。childGroup.register(child)
返回的是一个ChannelFuture对象,这里给改ChannelFuture注册了一个监听器,该监听器的的功能是当注册失败时进行关闭等清除工作。

try {
childGroup.register(child).addListener(new
ChannelFutureListener() {
@Override
publicvoid
operationComplete(ChannelFuture
future) throws Exception {
if (!future.isSuccess()) {
forceClose(child,
future.cause());
}
}
});
} catch (Throwable
t) {
forceClose(child,
t);
}

EventLoopGroup接口的register方法定义:

ChannelFuture register(Channel channel, ChannelPromise promise);

Register方法调用的是SingleThreadEventLoop类的register方法:

public ChannelFuture register(Channel
channel) {
return
register(channel,
new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture
register(final Channel
channel,
final ChannelPromise
promise) {
if (channel ==
null) {
thrownew NullPointerException("channel");
}
if (promise ==
null) {
thrownew NullPointerException("promise");
}

channel.unsafe().register(this,
promise);
returnpromise;
}

可以看到register返回的是一个Promise。

探究channel.unsafe().register(this,promise),具体的实现在AbstractUnsafe.register0(promise):

privatevoid register0(ChannelPromise
promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered =
true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable
t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +

"Swallowing the cause of the registration failure:",
t);
}
}
}

可以看出如果注册成功则会调用promise的setsuccess方法,当出现异常时调用tryFailure方法。

DefaultPromise类

public Promise<V>
setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
returnthis;
}
thrownew IllegalStateException("complete already: "
+ this);
}

privatevoid
notifyListeners() {

// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
//
Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true.
Once 'done'
//
becomes true, the listener list is never modified - see add/removeListener()
Object
listeners = this.listeners;
if (listeners ==
null) {
return;
}

this.listeners =
null;

EventExecutor executor = executor();
if (executor.inEventLoop()) {
final Integer
stackDepth =
LISTENER_STACK_DEPTH.get();
if (stackDepth <
MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth +
1);
try {
if (listenersinstanceof
DefaultFutureListeners) {
notifyListeners0(this, (DefaultFutureListeners)
listeners);
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<?
extends Future<V>>
l =

(GenericFutureListener<? extends Future<V>>)
listeners;
notifyListener0(this,
l);
}
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}

try {
if (listenersinstanceof
DefaultFutureListeners) {
final DefaultFutureListeners
dfl = (DefaultFutureListeners)
listeners;
executor.execute(new Runnable() {
@Override
publicvoid run() {
notifyListeners0(DefaultPromise.this,
dfl);
}
});
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<?
extends Future<V>>
l =
(GenericFutureListener<?
extends Future<V>>)
listeners;
executor.execute(new Runnable() {
@Override
publicvoid run() {
notifyListener0(DefaultPromise.this,
l);
}
});
}
} catch (Throwable
t) {
logger.error("Failed to notify listener(s). Event loop shut down?",
t);
}
}

从DefaultPromise的代码可以看出,调用Promise的设置结果方法时会通知Listener,这里看到所有的Listener是使用一个Executor的(初始化Promise时指定的),如果Executor就是当前线程,那是会串行的执行所有的Listener,这也是Google Guava的ListnerFuture和Netty的主要区别。

NotifyListener方法的注释中有提到对Listener的修改不需要同步,因为1.一般都是在同步的代码块中,2.调用该方法的时候Future的状态是Done,这个时候是不允许修改监听者了。

我们再来看下对应于JDK中FutureTask和ScheduledFutureTask的netty版本:io.netty.util.concurrent.ScheduledFutureTask,PromiseTask在Netty中的一个应用,下面是SingleThreadEventExecutor启动时会调用的一个函数,该函数将一个周期性的PurgeTask任务包装为一个ScheduledFutureTask对象,添加到延迟任务列表中。

privatevoid startThread() {
synchronized (stateLock) {
if (state ==
ST_NOT_STARTED) {
state =
ST_STARTED;
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this,
delayedTaskQueue, Executors.<Void>callable(new PurgeTask(),
null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
}
}
}

总结:Netty的Future扩展了JDK的Future接口,提供了异步计算结束后触发操作的定义,而promise再在次之上提供了可以干预异步计算结果的方法,进而提供了更加完善的异步编程接口。

Netty中Future相关角色包括:

Future: 可查询,可触发操作的异步计算结果

Promise:可控的Future

FutureListener:Future触发的操作接口

EventExecutor:执行器

PromiseTask: 包含执行任务的Promise

并提供了包括DefaultPromise,DefaultEventExecutor,PromiseTask等基础实现可以直接使用。

Guava 的Future

Guava 是Goolgle提供了Java编程的基础类库,是编程人推崇备至的神物,有人说从中可以看出编程之美,这下来再慢慢体会吧。

Guava com.google.common.util.concurrent并没有像Netty几乎重写了jdkconcurrent的所有类,而是继承自JDK,添加了监听者的功能。这可以在代码细节上看出两者的一些区别。

Guava com.google.common.util.concurrent.ListenerFuture:

publicinterface ListenableFuture<V>
extends Future<V> {
void
addListener(Runnable listener, Executor
executor);
}

Guava的Future同样添加了Listener功能,不同的是添加的时候需要传递一个执行该Listener的executor,换句话说所有的Listener可能不是使用一个EventExecutor来执行的。

ListenerFutureTask:

publicclass ListenableFutureTask<V>
extends FutureTask<V>
implements ListenableFuture<V> {

privatefinal ExecutionList
executionList =
new ExecutionList();
……
protectedvoid done() {
executionList.execute();
}
}

Guava使用ExecutionList结构来保存FutureListener,当Future任务完成时调用done方法执行所有的Listener.

ExecutionList

publicvoid execute() {
// Lock while we update our state so the add method above will finish adding
// any listeners before we start to run them.
RunnableExecutorPair
list;
synchronized (this) {
if (executed) {
return;
}
executed =
true;
list =
runnables;
runnables =
null;
// allow GC to free listeners even if this stays around for a while.
}
// If we succeeded then list holds all the
runnables we to execute. The pairs in the stack are
// in the opposite order from how they were added so we need to reverse the list to fulfill our
// contract.
// This is somewhat annoying, but turns out to be very fast in practice.
Alternatively, we
// could drop the contract on the method that enforces this queue like behavior since depending
// on it is likely to be a bug anyway.

// N.B. All writes to the list and the next pointers must have happened before the above

// synchronized block, so we can iterate the list without the lock held here.
RunnableExecutorPair
reversedList = null;
while (list !=
null) {
RunnableExecutorPair
tmp = list;
list =
list.next;
tmp.next =
reversedList;
reversedList =
tmp;
}
while (reversedList !=
null) {
executeListener(reversedList.runnable,
reversedList.executor);
reversedList =
reversedList.next;
}
}

/**
* Submits the given runnable to the given
{@link Executor} catching and logging all

*
{@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
privatestaticvoid executeListener(Runnable
runnable, Executor
executor) {
try {
executor.execute(runnable);
} catch (RuntimeException
e) {
// Log it and keep going, bad runnable and/or executor.
Don't
// punish the other
runnables if we're given a bad one. We only
// catch RuntimeException because we want Errors to propagate up.
log.log(Level.SEVERE,
"RuntimeException while executing runnable "
+
runnable + " with executor " +
executor,
e);
}
}

从代码可以看出在最终调用监听者的时候需要同步来获取runnable的监听列表和清空,而Netty把这个同步操作移到了修改Future状态的操作上。

下面再来看下一个重要的类AbstractFuture,该类实现了ListenableFuture,但没有实现Runnable接口,是ListenableFuture实现的推荐基类。

AbstractFuture将Future的状态转换为AQS的state使用AQS来作为Future状态操作的同步器,相比Netty自己实现Future的状态管理,在状态变化中都采用了同步块的方式代码上显得更加简明清晰。(至于效率上没有做压力测试还无法得知)

AbstractFuture部分代码:

/** Synchronization control for AbstractFutures. */
privatefinal Sync<V>
sync =
new Sync<V>();
……

protectedboolean
set(@Nullable V
value) {
booleanresult =
sync.set(value);
if (result) {
executionList.execute();
}
returnresult;
}

AbstractFuture.sync部分代码吗:

staticfinalclass
Sync<V> extends AbstractQueuedSynchronizer {
……
boolean set(@Nullable V
v) {
return
complete(v,
null,
COMPLETED);
}

……

privateboolean
complete(@Nullable V
v, @Nullable Throwable
t,
intfinalState) {
booleandoCompletion = compareAndSetState(RUNNING,
COMPLETING);
if (doCompletion) {
// If this thread successfully transitioned to COMPLETING, set the value
// and exception and then release to the final state.
this.value =
v;
// Don't actually construct a CancellationException until necessary.
this.exception = ((finalState
& (CANCELLED |
INTERRUPTED)) != 0)
? new CancellationException("Future.cancel() was called.")
: t;
releaseShared(finalState);
} elseif (getState() ==
COMPLETING) {
// If some other thread is currently completing the future, block until
// they are done so we can guarantee completion.
acquireShared(-1);
}
returndoCompletion;
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: