Netty系列:六、细说ChannelHandler和ChannelPipeLine
2018-03-12 21:02
651 查看
开头
本篇主要介绍ChannelHandler和
ChannelPipeLine、
ChannelContext。以及他们之间的的组织。
1.ChannelHandler
家族
Channel的生命周期
ChannelHandler的生命周期
ChannelInBoundHandler接口
ChannelInBoundHandlerAdapter
下面逐个介绍
1.1.
Channel的生命周期
被创建,未注册到
EventLoop
注册到
EventLoop
处于活动状态,已经连接到远程节点,可以接受和发送数据了
没有连接到远程节点
如图所示,当状态改变时,将会生成对应的事件。这些事件会发送给
ChannelHandler,随后做出相应。
1.2.
ChannelHandler的生命周期
下面是一些方法
1.
handlerAdded():当
ChannelHandler被添加到
ChannelPipeLine中时调用
2.
handlerRemoved():当
ChannelHandler在
ChannelPipeLine中被移除时调用
之前我们已经谈到过,
ChannelHandler的子接口:
ChannelInboundHandler:处理入站数据及各种状态变化
ChannelOutboundHandler:处理出站数据并且允许拦截所有的操作。
1.3.
ChannelInboundHandler/
ChannelOutboundHandler
ChannelInboundHandler的一些方法将会在数据被接收时或者与其对应的
Channel状态发生改变时被调用。正如我们前面所提到的,这些方法和
Channel的生命周期密切相关。
这些方法从名字就可以看出它们的作用,在此就不一一介绍。主要说下
channelRead()方法,
ChannelInboundHandler的实现重写
channelRead()方法时,它将负责显式地释放与池化的
ByteBuf实例相关的内存。Netty中提供了一个工具方法:
ReferenceCountUtil.release()。
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); }
各个方法的介绍:
出站操作和数据将由
ChannelOutboundHandler处理。它的方法将被
Channel、
ChannelPipeline以及
ChannelHandlerContext调用。此处就不介绍它的API了。
ChannelPromise和ChannelFuture:
ChannelPromise是
ChannelFuture的一个子类,其定义了一些可写的方法,如
setSuccess()和
setFailure(),从而使
ChannelFuture不可变 。
1.4.
ChannelHandlerAdapter
两个适配器分别提供了
ChannelInboundHandler和
ChannelOutboundHandler的基本实现。通过扩展抽象类
ChannelHandlerAdapter,它们获得了它们共同的超接口
ChannelHandler的方法。生成的类的层次结构如图:
ChannelHandlerAdapter还提供了实用方法
isSharable()。如果其对应的实现被标注为
@Sharable,那么这个方法将返回
true表示它可以被添加到多个
ChannelPipeline中,并且是数据安全的。
ChannelInboundHandlerAdapter和
ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的
ChannelHandlerContext上的等效方法,从而将事件转发到了
ChannelPipeline中的下一个
ChannelHandler中。
一般我们只需要简单地扩展它们,并且重写那些你想要自定义的方法就可以了。
1.5.内存资源管理{TODO}
每当通过调用
ChannelInboundHandler.channelRead()或者
ChannelOutboundHandler.write()方法来处理数据时,都需要确保没有任何的资源泄漏。我们在使用完后调整其计数器很重要。
Netty提供了
class ResourceLeakDetector,它将对你应用程序的缓冲区分配做检测内存泄露。并且产生相关日志。
2.ChannelPipeline
ChannelPipeline是一个拦截流经
Channel的入站和出站事件的
ChannelHandler实例链,那么就很容易看出这些
ChannelHandler之间的交互是如何组成一个应用程序数据和事件处理逻辑的核心的。
哎妈呀!好绕口~
每一个新创建的
Channel都将会被分配一个新的
ChannelPipeline。这项关联是永久性的;
Channel既不能附加另外一个
ChannelPipeline,也不能分离其当前的。
根据事件的起源,事件将会被
ChannelInboundHandler或者
ChannelOutboundHandler处理。随后通过调用
ChannelHandlerContext实现,它将被转发给同一超类型的下一个
ChannelHandler。
下图展示了一个典型的同时具有入站和出站
ChannelHandler的
ChannelPipeline的布局。
在
ChannelPipeline传播事件时,它会测试
ChannelPipeline中的下一个
ChannelHandler的类型是否和事件的运动方向相匹配。如果不匹配,
ChannelPipeline将跳过该
ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。
ChannelPipeline的一些方法
AddFirstaddBefore、
remove、
replace、
get等通过名字就可以看出他的功能。
下面是一个使用例子,并不能执行:
ChannelPipeline pipeline = null ; ServerHandller firstHandller = new ServerHandller(); pipeline.addLast("handler1", firstHandller); pipeline.addLast("handler2", new ServerHandller()); pipeline.addLast("handler3", new ServerHandller()); pipeline.remove("handler2"); pipeline.remove(firstHandller); pipeline.replace("handler3", "newHandler", firstHandller); //返回和 ChannelHandler 绑定的 ChannelHandlerContext pipeline.context("newHandler"); pipeline.get("newHandler");
总结一下
ChannelPipeline保存了与
Channel相关联的
ChannelHandler,ChannelPipeline可以根据需要,通过添加或者删除
ChannelHandler来动态地修改;
ChannelPipeline有着丰富的API用以被调用,以响应入站和出站事件,关于这些内容我们以后介绍。
3.ChannelHandlerContext
接口
ChannelHandlerContext代表了
ChannelHandler和
ChannelPipeline之间的关联,每当
ChannelHandler添加到
ChannelPipeline中时,都会创建
ChannelHandlerContext。
ChannelHandlerContext的主要功能是管理它所关联的
ChannelHandler和在同一个
ChannelPipeline中的其他
ChannelHandler之间的交互。下面是他们的关系图:
举几个例子
//获取到与ChannelHandlerContext相关联的 Channel 的引用 ChannelHandlerContext ctx = ..; Channel channel = ctx.channel(); //通过Channel 写入缓冲区 //write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline channel.write(Unpooled.copiedBuffer("hellow",CharsetUtil.UTF_8));
ChannelHandlerContext ctx = ..; //获取到与ChannelHandlerContext相关联的ChannelPipeline 的引用 ChannelPipeline pipeline = ctx.pipeline(); //通过 ChannelPipeline写入缓冲区 pipeline.write(Unpooled.copiedBuffer("hellow",CharsetUtil.UTF_8));
在上面的例子中,write()方法将一直传播事件通过整个
ChannelPipeline,事件从一个
ChannelHandler到下一个
ChannelHandler的移动是由
ChannelHandlerContext上的调用完成的。如下图:
API
3.1.高级用法
一种高级的用法是缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的
ChannelHandler方法之外,甚至来自于不同的线程。
public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }
因为一个
ChannelHandler可以从属于多个
ChannelPipeline,所以它也可以绑定到多个
ChannelHandlerContext实例。对于这种用法指在多个
ChannelPipeline中共享同一个 ChannelHandler,对应的
ChannelHandler必须要使用@Sharable 注解标注;否则,试图将它添加到多个
ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的 Channel(即连接),这样的
ChannelHandler必须是线程安全的。
共享ChannelHandler
共享同一个
ChannelHandler在多个ChannelPipeline中安装同一个
ChannelHandler的一个常见的原因是用于收集跨越多个
Channel的统计信息。
正确的用法:
@Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read message: " + msg); //记录方法调用,并转发给下一个 ChannelHandler ctx.fireChannelRead(msg); } }
@Sharable public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //将count字段的值加 1 count++; System.out.println("channelRead(...) called the "+ count + " time"); ctx.fireChannelRead(msg); } }
这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到
ChannelPipeline将极有可能在它被多个并发的
Channel访问时导致问题。(当然,这个简单的问题可以通过使
channelRead()方法变为同步方法来修正。)总之,只应该在确定了
ChannelHandler是线程安全的时才使用@Sharable 注解。
4.异常处理
4.1.入站异常处理如果在处理入站事件的过程中有异常被抛出,那么它将从它在
ChannelInboundHandler里被触发的那一点开始流经
ChannelPipeline。要想处理这种类型的入站异常,需要在的
ChannelInboundHandler实现中重写下面的方法。
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
异常处理通常位于
ChannelPipeline的最后。这确保了所有的入站异常都总是会被处理
ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给
ChannelPipeline中的下一个
ChannelHandler;
如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。
4.2.出站异常处理
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制
每个出站操作都将返回一个
ChannelFuture。注册到
ChannelFuture的
ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了。
几乎所有的
ChannelOutboundHandler上的方法都会传入一个
ChannelPromise的实例。作为
ChannelFuture的子类,
ChannelPromise也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:
ChannelPromise setSuccess();``ChannelPromise setFailure(Throwable cause);
添加
ChannelFutureListener只需要调用
ChannelFuture实例上的
addListener方法。一般有两种方法:
调用出站操作(如 write()方法)所返回的
ChannelFuture上
a4e0
的
addListener()方法。
ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } });
第二种方式是将
ChannelFutureListener添加到即将作为参数传递给
ChannelOutboundHandler的方法的
ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } }); } }
如果
ChannelOutboundHandler本身抛出了异常会发生什么呢?在这种情况下,Netty 本身会通知已经注册到对应
ChannelPromise的监听器。
下篇:http://blog.csdn.net/theludlows/article/details/79570464
相关文章推荐
- Netty中Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之间的关系
- netty源码分析之-Channel、ChannelPipeline、ChannelHandler以及 ChannelHandlerContext 详解(2)
- Java NIO框架-Netty-3 ChannelHandler & ChannelPipeline
- Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链
- 《Netty in Action》chapter 6 : ChannelHandler and ChannelPipeline
- Netty源码之ChannelPipeline和ChannelHandlerContext
- Netty 4 ChannelHandler和ChannelPipeline
- Netty ChannelPipeline动态修改ChannelHandler
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- java netty之ChannelInboundByteHandlerAdapter
- 一起学Netty(四)之 ChannelHandler,ChannelHandlerContext,ChannelPipeline
- Netty4实战第八章:Netty提供的ChannelHandler和编解码器
- Netty学习总结(4)——图解Netty之Pipeline、channel、Context之间的数据流向
- netty之SimpleChannelInboundHandler
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- 深入研究Netty框架之ChannelHandler和ChannelPipeline
- netty的ChannelInboundHandler接口
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- 【Netty源码】ChannelPipeline源码剖析
- Netty4.x中文教程系列(三) ChannelHandler