您的位置:首页 > 其它

Netty系列:六、细说ChannelHandler和ChannelPipeLine

2018-03-12 21:02 651 查看

开头

本篇主要介绍
ChannelHandler
ChannelPipeLine
ChannelContext
。以及他们之间的的组织。

1.
ChannelHandler
家族

Channel
的生命周期

ChannelHandler
的生命周期

ChannelInBoundHandler
接口

ChannelInBoundHandlerAdapter


下面逐个介绍

1.1.
Channel
的生命周期


被创建,未注册到
EventLoop


注册到
EventLoop


处于活动状态,已经连接到远程节点,可以接受和发送数据了

没有连接到远程节点



如图所示,当状态改变时,将会生成对应的事件。这些事件会发送给
ChannelHandler
,随后做出相应。

1.2.
ChannelHandler
的生命周期


下面是一些方法



1.
handlerAdded()
:当
ChannelHandler
被添加到
ChannelPipeLine
中时调用

2.
handlerRemoved()
:当
ChannelHandler
ChannelPipeLine
中被移除时调用

之前我们已经谈到过,
ChannelHandler
的子接口:

ChannelInboundHandler
:处理入站数据及各种状态变化

ChannelOutboundHandler
:处理出站数据并且允许拦截所有的操作。

1.3.
ChannelInboundHandler
/
ChannelOutboundHandler


ChannelInboundHandler
的一些方法将会在数据被接收时或者与其对应的
Channel
状态发生改变时被调用。正如我们前面所提到的,这些方法和
Channel
的生命周期密切相关。



这些方法从名字就可以看出它们的作用,在此就不一一介绍。主要说下
channelRead()
方法,
ChannelInboundHandler
的实现重写
channelRead()
方法时,它将负责显式地释放与池化的
ByteBuf
实例相关的内存。Netty中提供了一个工具方法:
ReferenceCountUtil.release()


@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}


各个方法的介绍:



出站操作和数据将由
ChannelOutboundHandler
处理。它的方法将被
Channel
ChannelPipeline
以及
ChannelHandlerContext
调用。此处就不介绍它的API了。



ChannelPromise和ChannelFuture:
ChannelPromise
ChannelFuture
的一个子类,其定义了一些可写的方法,如
setSuccess()
setFailure()
,从而使
ChannelFuture
不可变 。

1.4.
ChannelHandlerAdapter


两个适配器分别提供了
ChannelInboundHandler
ChannelOutboundHandler
的基本实现。通过扩展抽象类
ChannelHandlerAdapter
,它们获得了它们共同的超接口
ChannelHandler
的方法。生成的类的层次结构如图:



ChannelHandlerAdapter
还提供了实用方法
isSharable()
。如果其对应的实现被标注为
@Sharable
,那么这个方法将返回
true
表示它可以被添加到多个
ChannelPipeline
中,并且是数据安全的。

ChannelInboundHandlerAdapter
ChannelOutboundHandlerAdapter
中所提供的方法体调用了其相关联的
ChannelHandlerContext
上的等效方法,从而将事件转发到了
ChannelPipeline
中的下一个
ChannelHandler
中。

一般我们只需要简单地扩展它们,并且重写那些你想要自定义的方法就可以了。

1.5.内存资源管理{TODO}

每当通过调用
ChannelInboundHandler.channelRead()
或者
ChannelOutboundHandler.write()
方法来处理数据时,都需要确保没有任何的资源泄漏。我们在使用完后调整其计数器很重要。

Netty提供了
class ResourceLeakDetector
,它将对你应用程序的缓冲区分配做检测内存泄露。并且产生相关日志。

2.
ChannelPipeline

ChannelPipeline
是一个拦截流经
Channel
的入站和出站事件的
ChannelHandler
实例链,那么就很容易看出这些
ChannelHandler
之间的交互是如何组成一个应用程序数据和事件处理逻辑的核心的。

哎妈呀!好绕口~

每一个新创建的
Channel
都将会被分配一个新的
ChannelPipeline
。这项关联是永久性的;
Channel
既不能附加另外一个
ChannelPipeline
,也不能分离其当前的。

根据事件的起源,事件将会被
ChannelInboundHandler
或者
ChannelOutboundHandler
处理。随后通过调用
ChannelHandlerContext
实现,它将被转发给同一超类型的下一个
ChannelHandler


下图展示了一个典型的同时具有入站和出站
ChannelHandler
ChannelPipeline
的布局。



ChannelPipeline
传播事件时,它会测试
ChannelPipeline
中的下一个
ChannelHandler
的类型是否和事件的运动方向相匹配。如果不匹配,
ChannelPipeline
将跳过该
ChannelHandler
并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。

ChannelPipeline
的一些方法


AddFirstaddBefore
remove
replace
get
等通过名字就可以看出他的功能。

下面是一个使用例子,并不能执行:

ChannelPipeline pipeline = null ;
ServerHandller firstHandller = new ServerHandller();
pipeline.addLast("handler1", firstHandller);
pipeline.addLast("handler2", new ServerHandller());
pipeline.addLast("handler3", new ServerHandller());
pipeline.remove("handler2");
pipeline.remove(firstHandller);
pipeline.replace("handler3", "newHandler", firstHandller);
//返回和 ChannelHandler 绑定的 ChannelHandlerContext
pipeline.context("newHandler");
pipeline.get("newHandler");


总结一下

ChannelPipeline
保存了与
Channel
相关联的
ChannelHandler,
ChannelPipeline可以根据需要,通过添加或者删除
ChannelHandler
来动态地修改;
ChannelPipeline
有着丰富的API用以被调用,以响应入站和出站事件,关于这些内容我们以后介绍。

3.
ChannelHandlerContext
接口

ChannelHandlerContext
代表了
ChannelHandler
ChannelPipeline
之间的关联,每当
ChannelHandler
添加到
ChannelPipeline
中时,都会创建
ChannelHandlerContext
ChannelHandlerContext
的主要功能是管理它所关联的
ChannelHandler
和在同一个
ChannelPipeline
中的其他
ChannelHandler
之间的交互。下面是他们的关系图:



举几个例子

//获取到与ChannelHandlerContext相关联的 Channel 的引用
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();
//通过Channel 写入缓冲区
//write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline
channel.write(Unpooled.copiedBuffer("hellow",CharsetUtil.UTF_8));


ChannelHandlerContext ctx = ..;
//获取到与ChannelHandlerContext相关联的ChannelPipeline 的引用
ChannelPipeline pipeline = ctx.pipeline();
//通过 ChannelPipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("hellow",CharsetUtil.UTF_8));


在上面的例子中,write()方法将一直传播事件通过整个
ChannelPipeline
,事件从一个
ChannelHandler
到下一个
ChannelHandler
的移动是由
ChannelHandlerContext
上的调用完成的。如下图:



API





3.1.高级用法

一种高级的用法是缓存到
ChannelHandlerContext
的引用以供稍后使用,这可能会发生在任何的
ChannelHandler
方法之外,甚至来自于不同的线程。

public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void send(String msg) {
ctx.writeAndFlush(msg);
}
}


因为一个
ChannelHandler
可以从属于多个
ChannelPipeline
,所以它也可以绑定到多个
ChannelHandlerContext
实例。对于这种用法指在多个
ChannelPipeline
中共享同一个 ChannelHandler,对应的
ChannelHandler
必须要使用@Sharable 注解标注;否则,试图将它添加到多个
ChannelPipeline
时将会触发异常。显而易见,为了安全地被用于多个并发的 Channel(即连接),这样的
ChannelHandler
必须是线程安全的。

共享ChannelHandler

共享同一个
ChannelHandler
在多个ChannelPipeline中安装同一个
ChannelHandle
r的一个常见的原因是用于收集跨越多个
Channel
的统计信息。

正确的用法:

@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read message: " + msg);
//记录方法调用,并转发给下一个 ChannelHandler
ctx.fireChannelRead(msg);
}
}


@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//将count字段的值加 1
count++;
System.out.println("channelRead(...) called the "+ count + " time");
ctx.fireChannelRead(msg);
}
}


这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到
ChannelPipeline
将极有可能在它被多个并发的
Channel
访问时导致问题。(当然,这个简单的问题可以通过使
channelRead()
方法变为同步方法来修正。)总之,只应该在确定了
ChannelHandler
是线程安全的时才使用@Sharable 注解。

4.异常处理

4.1.入站异常处理

如果在处理入站事件的过程中有异常被抛出,那么它将从它在
ChannelInboundHandler
里被触发的那一点开始流经
ChannelPipeline
。要想处理这种类型的入站异常,需要在的
ChannelInboundHandler
实现中重写下面的方法。

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}


异常处理通常位于
ChannelPipeline
的最后。这确保了所有的入站异常都总是会被处理

ChannelHandler.exceptionCaught()
的默认实现是简单地将当前异常转发给
ChannelPipeline
中的下一个
ChannelHandler


如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;

要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。

4.2.出站异常处理

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制

每个出站操作都将返回一个
ChannelFuture
。注册到
ChannelFuture
ChannelFutureListener
将在操作完成时被通知该操作是成功了还是出错了。

几乎所有的
ChannelOutboundHandler
上的方法都会传入一个
ChannelPromise
的实例。作为
ChannelFuture
的子类,
ChannelPromise
也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:

ChannelPromise setSuccess();``ChannelPromise setFailure(Throwable cause);


添加
ChannelFutureListener
只需要调用
ChannelFuture
实例上的
addListener
方法。一般有两种方法:

调用出站操作(如 write()方法)所返回的
ChannelFuture

a4e0
addListener()
方法。

ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});


第二种方式是将
ChannelFutureListener
添加到即将作为参数传递给
ChannelOutboundHandler
的方法的
ChannelPromise


public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,    ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}


如果
ChannelOutboundHandler
本身抛出了异常会发生什么呢?在这种情况下,Netty 本身会通知已经注册到对应
ChannelPromise
的监听器。

下篇:http://blog.csdn.net/theludlows/article/details/79570464
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: