您的位置:首页 > 其它

Netty4之Future/Promise异步模型

2015-08-19 11:43 288 查看
在并发编程中,我们通常会用到一组非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 可以说这一套模型是很多异步非阻塞架构的基础。Netty 4中正提供了这种Future/Promise异步模型。



Netty文档说明Netty的网络操作都是异步的, 在源码上大量使用了Future/Promise模型,在Netty里面也是这样定义的:



Future接口定义了isSuccess(),isCancellable(),cause(),这些判断异步执行状态的方法。(read-only)

Promise接口在extneds future的基础上增加了setSuccess(), setFailure()这些方法。(writable)

java.util.concurrent.Future是Java提供的接口,表示异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成。

[java]
view plaincopy

// Java FutureTask.get()

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

Netty扩展了Java的Future,最主要的改进就是增加了监听器Listener接口,通过监听器可以让异步执行更加有效率,不需要通过get来等待异步执行结束,而是通过监听器回调来精确地控制异步执行结束的时间点。

[java]
view plaincopy

public interface Future<V> extends java.util.concurrent.Future<V> {

boolean isSuccess();

boolean isCancellable();

Throwable cause();

Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> sync() throws InterruptedException;

Future<V> syncUninterruptibly();

Future<V> await() throws InterruptedException;

Future<V> awaitUninterruptibly();

boolean await(long timeout, TimeUnit unit) throws InterruptedException;

boolean await(long timeoutMillis) throws InterruptedException;

boolean awaitUninterruptibly(long timeout, TimeUnit unit);

boolean awaitUninterruptibly(long timeoutMillis);

V getNow();

boolean cancel(boolean mayInterruptIfRunning);
}

ChannelFuture接口扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时关联了Channel,跟一个Channel绑定

[java]
view plaincopy

public interface ChannelFuture extends Future<Void> {

Channel channel();

@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;

@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;

@Override
ChannelFuture awaitUninterruptibly();
}

Promise接口也扩展了Future接口,它表示一种可写的Future,就是可以设置异步执行的结果

[java]
view plaincopy

public interface Promise<V> extends Future<V> {

Promise<V> setSuccess(V result);

boolean trySuccess(V result);

Promise<V> setFailure(Throwable cause);

boolean tryFailure(Throwable cause);
}

ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,又可写异步执行结构,又具备了监听者的功能,是Netty实际编程使用的表示异步执行的接口

[java]
view plaincopy

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

@Override
Channel channel();

@Override
ChannelPromise setSuccess(Void result);

ChannelPromise setSuccess();

boolean trySuccess();

@Override
ChannelPromise setFailure(Throwable cause);

@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise sync() throws InterruptedException;

@Override
ChannelPromise syncUninterruptibly();

@Override
ChannelPromise await() throws InterruptedException;

@Override
ChannelPromise awaitUninterruptibly();
}

DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promoise实例。Channel接口提供了newPromise接口,表示Channel要创建一个异步执行的动作

[java]
view plaincopy

public interface Channel extends AttributeMap, Comparable<Channel> {
ChannelPromise newPromise();
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public ChannelPromise newPromise() {
return new DefaultChannelPromise(this);
}
}

Netty推荐使用addListener的方式来回调异步执行的结果,这种方式优于Future.get,能够更精确地把握异步执行结束的时间
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: