Netty中的ChannelFuture和ChannelPromise
在Netty使用ChannelFuture和ChannelPromise进行异步操作的处理
这是官方给出的ChannelFutur描述
* | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
由图可以知道ChannelFutur有四种状态:Uncompleted、Completed successfully、Completed with failure、Completed by cancellation,这几种状态是由isDone、isSuccess、isCancelled、cause这四种方法的返回值决定的。
ChannelFutur接口的定义如下:
public interface ChannelFuture extends Future<Void> { Channel channel(); ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelFuture addListeners(GenericFutureListener... var1); ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelFuture removeListeners(GenericFutureListener... var1); ChannelFuture sync() throws InterruptedException; ChannelFuture syncUninterruptibly(); ChannelFuture await() throws InterruptedException; ChannelFuture awaitUninterruptibly(); boolean isVoid(); }
继承自Netty的Future:
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); Future<V> addListeners(GenericFutureListener... var1); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); Future<V> removeListeners(GenericFutureListener... var1); Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long var1, TimeUnit var3) throws InterruptedException; boolean await(long var1) throws InterruptedException; boolean awaitUninterruptibly(long var1, TimeUnit var3); boolean awaitUninterruptibly(long var1); V getNow(); boolean cancel(boolean var1); }
Netty的Future又继承自JDK的Future:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ChannelPromise继承了ChannelFuture:
public interface ChannelPromise extends ChannelFuture, Promise<Void> { Channel channel(); ChannelPromise setSuccess(Void var1); ChannelPromise setSuccess(); boolean trySuccess(); ChannelPromise setFailure(Throwable var1); ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelPromise addListeners(GenericFutureListener... var1); ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelPromise removeListeners(GenericFutureListener... var1); ChannelPromise sync() throws InterruptedException; ChannelPromise syncUninterruptibly(); ChannelPromise await() throws InterruptedException; ChannelPromise awaitUninterruptibly(); ChannelPromise unvoid(); }
其中Promise接口定义如下:
public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V var1); boolean trySuccess(V var1); Promise<V> setFailure(Throwable var1); boolean tryFailure(Throwable var1); boolean setUncancellable(); Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); Promise<V> addListeners(GenericFutureListener... var1); Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); Promise<V> removeListeners(GenericFutureListener... var1); Promise<V> await() throws InterruptedException; Promise<V> awaitUninterruptibly(); Promise<V> sync() throws InterruptedException; Promise<V> syncUninterruptibly(); }
在Netty中,无论是服务端还是客户端,在Channel注册时都会为其绑定一个ChannelPromise,默认实现是DefaultChannelPromise
DefaultChannelPromise定义如下:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; public DefaultChannelPromise(Channel channel) { this.channel = checkNotNull(channel, "channel"); } public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = checkNotNull(channel, "channel"); } @Override protected EventExecutor executor() { EventExecutor e = super.executor(); if (e == null) { return channel().eventLoop(); } else { return e; } } @Override public Channel channel() { return channel; } @Override public ChannelPromise setSuccess() { return setSuccess(null); } @Override public ChannelPromise setSuccess(Void result) { super.setSuccess(result); return this; } @Override public boolean trySuccess() { return trySuccess(null); } @Override public ChannelPromise setFailure(Throwable cause) { super.setFailure(cause); return this; } @Override public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; } @Override public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.addListeners(listeners); return this; } @Override public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.removeListener(listener); return this; } @Override public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.removeListeners(listeners); return this; } @Override public ChannelPromise sync() throws InterruptedException { super.sync(); return this; } @Override public ChannelPromise syncUninterruptibly() { super.syncUninterruptibly(); return this; } @Override public ChannelPromise await() throws InterruptedException { super.await(); return this; } @Override public ChannelPromise awaitUninterruptibly() { super.awaitUninterruptibly(); return this; } @Override public long flushCheckpoint() { return checkpoint; } @Override public void flushCheckpoint(long checkpoint) { this.checkpoint = checkpoint; } @Override public ChannelPromise promise() { return this; } @Override protected void checkDeadLock() { if (channel().isRegistered()) { super.checkDeadLock(); } } @Override public ChannelPromise unvoid() { return this; } @Override public boolean isVoid() { return false; } }
可以看到这个DefaultChannelPromise仅仅是将Channel封装了,而且其基本上所有方法的实现都依赖于父类DefaultPromise
DefaultPromise中的实现是整个ChannelFuture和ChannelPromise的核心所在:
DefaultPromise中有如下几个状态量:
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); private static final Object SUCCESS = new Object(); private static final Object UNCANCELLABLE = new Object(); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)")); private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
MAX_LISTENER_STACK_DEPTH: 表示最多可执行listeners的数量,默认是8
SUCCESS :表示异步操作正常完成
UNCANCELLABLE:表示异步操作不可取消,并且尚未完成
CANCELLATION_CAUSE_HOLDER:表示异步操作取消监听,用于cancel操作,
而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息:
private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } }
RESULT_UPDATER:是一个原子更新器,通过CAS操作,原子化更新 DefaultPromise对象的名为result的成员,这个result成员是其异步操作判断的关键所在
DefaultPromise的成员及构造方法定义:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { private volatile Object result; private final EventExecutor executor; private Object listeners; private short waiters; private boolean notifyingListeners; public DefaultPromise(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); } }
result:就是前面说的,判断异步操作状态的关键
result的取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及null (其实还可以是泛型V类型的任意对象,这里暂不考虑)
executor:就是Channel绑定的NioEventLoop,在我之前的博客说过,Channel的异步操作都是在NioEventLoop的线程中完成的([Netty中NioEventLoopGroup的创建源码分析](https://blog.csdn.net/Z_ChenChen/article/details/90567863))
listeners:通过一个Object保存所有对异步操作的监听,用于异步操作的回调
waiters:记录阻塞中的listeners的数量
notifyingListeners:是否需要唤醒的标志
首先来看isDone方法,通过之前的图可以知道,
isDone为false对应了Uncompleted状态,即异步操作尚未完成;
isDone为true则代表了异步操作完成,但是还是有三种完成情况,需要结合别的判断方法才能具体知道是哪种情况;
isDone方法:
@Override public boolean isDone() { return isDone0(result); }
调用isDone0:
private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }
有如下几种情况:
result等于null,result没有赋值,表示异步操作尚未完成(从这里就能想到异步操作完成,需要调用某个set方法来改变result的状态)
result是UNCANCELLABLE状态,表示执行中的异步操作不可取消,当然也就是异步操作尚未完成
result不等于null,且不等于UNCANCELLABLE,就表示异步操作完成(包括正常完成,以及异常结束,需要由cause方法进一步判断)
isSuccess方法:
@Override public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }
由这里可以知道当且仅当result 为SUCCESS状态时,才返回true(其余除UNCANCELLABLE和null的值其实也可以,这里暂不考虑)
isCancelled方法:
@Override public boolean isCancelled() { return isCancelled0(result); }
调用isCancelled0方法:
private static boolean isCancelled0(Object result) { return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; }
只有当result是CancellationException的实例时,表示取消异步操作
接着来看cause方法:
@Override public Throwable cause() { Object result = this.result; return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null; }
和上面同理,通过判别resul是否是CauseHolder的实现类,若是,将CauseHolder保存的异常返回。
几种状态的判别说完了,下面看一下如何设置这几种状态的:
setSuccess方法:
@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); }
首先调用setSuccess0方法,其中result的泛型通过DefaultChannelPromise可知是Void,在DefaultChannelPromise中所有的set和try操作参数都是null,这里的result也就不去考虑:
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
继续调用setValue0方法:
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
通过CAS操作,将result状态变为SUCCESS
其中checkNotifyWaiters方法:
private synchronized void checkNotifyWaiters() { if (waiters > 0) { notifyAll(); } }
检查waiters的个数,唤醒所有阻塞中的this,sync方法会引起阻塞
回到setSuccess方法中,setSuccess0通过CAS操作,将result状态更新为SUCCESS成功后,调用
notifyListeners方法,唤醒所有listener完成对异步操作的回调
listeners是通过addListener方法添加的,用来对异步操作进行侦听:
看到addListener方法:
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; } @Override public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners"); synchronized (this) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } addListener0(listener); } } if (isDone()) { notifyListeners(); } return this; }
其中GenericFutureListener接口定义如下:
public interface GenericFutureListener<F extends Future<?>> extends EventListener { /** * Invoked when the operation associated with the {@link Future} has been completed. * * @param future the source {@link Future} which called this callback */ void operationComplete(F future) throws Exception; }
可以看到listener其实就是通过operationComplete方法,来完成回调,对Future对象进行处理,由注释可知operationComplete方法是在future操作完成时调用
addListeners方法的实现比较简单,实现核心是在addListener0中:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } }
其中DefaultFutureListeners是将GenericFutureListener对象封装的一个数组:
final class DefaultFutureListeners { private GenericFutureListener<? extends Future<?>>[] listeners; private int size; private int progressiveSize; @SuppressWarnings("unchecked") DefaultFutureListeners( GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; listeners[0] = first; listeners[1] = second; size = 2; if (first instanceof GenericProgressiveFutureListener) { progressiveSize ++; } if (second instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } listeners[size] = l; this.size = size + 1; if (l instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void remove(GenericFutureListener<? extends Future<?>> l) { final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; int size = this.size; for (int i = 0; i < size; i ++) { if (listeners[i] == l) { int listenersToMove = size - i - 1; if (listenersToMove > 0) { System.arraycopy(listeners, i + 1, listeners, i, listenersToMove); } listeners[-- size] = null; this.size = size; if (l instanceof GenericProgressiveFutureListener) { progressiveSize --; } return; } } } public GenericFutureListener<? extends Future<?>>[] listeners() { return listeners; } public int size() { return size; } public int progressiveSize() { return progressiveSize; } }
size:记录listeners的个数
progressiveSize:记录GenericProgressiveFutureListener类型的listeners的个数
DefaultFutureListeners 中对数组的操作比较简单,
add方法,当size达到数组长度时,进行二倍扩容,
其中GenericProgressiveFutureListener继承自GenericFutureListener:
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> { /** * Invoked when the operation has progressed. * * @param progress the progress of the operation so far (cumulative) * @param total the number that signifies the end of the operation when {@code progress} reaches at it. * {@code -1} if the end of operation is unknown. */ void operationProgressed(F future, long progress, long total) throws Exception; }
由注释可知operationProgressed是在future操作进行时调用,这里不对GenericProgressiveFutureListener过多讨论
回到addListener0方法,由DefaultFutureListeners就可以知道,实际上通过DefaultFutureListeners管理的一维数组来保存listeners
在addListener方法完成对listener的添加后,还会调用isDone方法检查当前异步操作是否完成,若是完成需要调用notifyListeners,直接唤醒所有listeners完后对异步操作的回调
有add就有remove,removeListener方法:
@Override public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { removeListener0(listener); } return this; } @Override public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners"); synchronized (this) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } removeListener0(listener); } } return this; }
还是由removeListener0来实现:
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null; } }
看过之前的内容在看这里就比较简单了,通过DefaultFutureListeners去删除
notifyListeners方法:
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
其中executor方法:
protected EventExecutor executor() { return executor; }
用来获取executor轮询线程对象
判断executor是否处于轮询,否则需要通过safeExecute方法处理listeners的侦听,
safeExecute方法:
private static void safeExecute(EventExecutor executor, Runnable task) { try { executor.execute(task); } catch (Throwable t) { rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); } }
这里保证了listeners的侦听回调是异步执行
InternalThreadLocalMap在我之前的博客中说过,是Netty使用的ThreadLocal (Netty中FastThreadLocal源码分析)
去线程本地变量中找futureListenerStackDepth(默认为0),判断stackDepth是否小于MAX_LISTENER_STACK_DEPTH,否则也要通过safeExecute方法处理listeners的侦听
核心都是调用notifyListenersNow方法:
private void notifyListenersNow() { Object listeners; synchronized (this) { // Only proceed if there are listeners to notify and we are not already notifying listeners. if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<?>) listeners); } synchronized (this) { if (this.listeners == null) { // Nothing can throw from within this method, so setting notifyingListeners back to false does not // need to be in a finally block. notifyingListeners = false; return; } listeners = this.listeners; this.listeners = null; } } }
先检查是否需要监听,满足条件后,判断listeners是否是DefaultFutureListeners,即包装后的数组
notifyListeners0方法:
private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0; i < size; i ++) { notifyListener0(this, a[i]); } }
遍历这个数组,实则调用notifyListener0方法:
private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } }
这里就可以看到,完成了对operationComplete的回调,处理future
setSuccess结束,再来看trySuccess方法:
@Override public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
对比setSuccess来看,只有返回值不一样
setFailure方法:
@Override public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); } private boolean setFailure0(Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
和setSuccess逻辑一样,只不过CAS操作将状态变为了CauseHolder对象,成功后唤醒listeners对异步操作的回调
tryFailure方法:
@Override public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }
也都是一个逻辑
还有一个setUncancellable方法:
@Override public boolean setUncancellable() { if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { return true; } Object result = this.result; return !isDone0(result) || !isCancelled0(result); }
若是result状态为null,异步操作尚未结束,直接通过CAS操作将状态变为UNCANCELLABLE
否则若是根据状态来判断
下来看到cancel方法:
/** * {@inheritDoc} * * @param mayInterruptIfRunning this value has no effect in this implementation. */ @Override public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { checkNotifyWaiters(); notifyListeners(); return true; } return false; }
mayInterruptIfRunning正如注释中所说,在这里没有什么作用
还是通过CAS操作,将状态变为CANCELLATION_CAUSE_HOLDER,调用checkNotifyWaiters唤醒因sync阻塞的线程,notifyListeners方法回调listeners的侦听
最后看到sync方法:
@Override public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); return this; }
先调用await方法:
@Override public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; }
先判断能否执行(异步操作尚未结束,当前线程没有被中断),然后调用checkDeadLock方法:
protected void checkDeadLock() { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } }
检查轮询线程是否在工作
在synchronized块中以自身为锁,自旋等待异步操作的完成,若是没完成,调用incWaiters方法:
private void incWaiters() { if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this); } ++waiters; }
在小于Short.MAX_VALUE的情况下,对waiters自增,
然后使用wait将自身阻塞,等待被唤醒
所以在之前setValue0时,checkNotifyWaiters操作会notifyAll,
由此可以知道sync方法的作用:在某一线程中调用sync方法会使得当前线程被阻塞,只有当异步操作执完毕,通过上面的set方法改变状态后,才会调用checkNotifyWaiters方法唤醒当前线程。
当从阻塞中被唤醒后调用decWaiters方法:
private void decWaiters() { --waiters; }
使得waiters自减
通过这样一种自旋方式,一直等到isDone成立,结束自旋,进而结束await方法,然后调用rethrowIfFailed方法:
private void rethrowIfFailed() { Throwable cause = cause(); if (cause == null) { return; } PlatformDependent.throwException(cause); }
根据异步操作是否有异常,进而使用PlatformDependent抛出异常。
至此Netty中的ChannelFuture和ChannelPromise分析到此全部结束。
- Netty的ChannelFuture和ChannelPromise
- Netty实战——Channel、EventLoop和ChannelFuture详解
- Netty源码之ChannelPromise回调通知问题
- Netty学习笔记(二) Channel和ChannelFuture
- Netty 4.0 源码分析(三):Channel和ChannelPipeline
- SparkStreaming+Flume出现ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException
- 自顶向下深入分析Netty(六)--Channel源码实现
- Netty源码:服务端Channel的创建
- Netty 回调 与 Channel 执行流程
- Netty 源码(ChannelHandler 死磕)
- Netty利用ChannelGroup广播消息
- Netty之ChannelOption
- Netty学习总结(4)——图解Netty之Pipeline、channel、Context之间的数据流向
- netty ChannelFuture的不解?
- Netty之ChannelOption
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty学习笔记之EmbededChannel 单元测试
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty系列:六、细说ChannelHandler和ChannelPipeLine
- Java netty的option(ChannelOption.SO_BACKLOG, backLog)什么意思