您的位置:首页 > 其它

Netty实战六之ChannelHandler和ChannelPipeline

2021-01-08 20:46 232 查看

1、Channel的生命周期

Interface Channel定义了一组和ChannelInboundHandler API密切相关的简单但功能强大的状态模型,以下列出Channel的4个状态。

ChannelUnregistered:Channel已经被创建,但还未注册到EventLoop

ChannelRegistered:Channel已经被注册到了EventLoop

ChannelActive:Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了

ChannelInactive:Channel没有连接到远程节点

Channel的正常生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。

2、ChannelHandler的生命周期

下表列出了interface ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操作,这些方法中的每一个都接受一个ChannelHandlerContext参数。

handlerAdded:当把ChannelHandler添加到ChannelPipeline中时被调用

handlerRemoved:当从ChannelPipeline中移除ChannelHandler时被调用

exceptionCaught:当处理过程中在ChannelPipeline中有错误产生时被调用

Netty定义了下面两个重要的ChannelHandler子接口:

·ChannelInboundHandler——处理入站数据以及各种状态变化

·ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作

3、ChannelInboundHandler接口

当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化ByteBuf实例相关的内存,Netty为此提供了一个实用方法ReferenceCountUtil.release()。

@ChannelHandler.Sharable//扩展了ChannelInboundHandlerAdapterpublic class DiscardHandler extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //丢弃已接收的消息
ReferenceCountUtil.release(msg);
}
}

Netty将使用WARN级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例,但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用SimpleChannelInboundHandler。

@ChannelHandler.Sharable//扩展了SimpleChannelInboundHandlerpublic class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object>{

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {        //不需要任何显式的资源释放
//No need to do anything special
}
}

由于SimpleChannelInboundHandler会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

4、ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler处理,它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷并在稍后继续。

ChannelPromise与ChannelFuture : ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。

5、ChannelHandler适配器

你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现,通过扩展抽象类ChannelHandlerAdapter,他们获得了他们共同的超接口ChannelHandler的方法。生成的类的层次结构如下图。

ChannelHandlerAdapter还提供了使用方法isSharable(),如果其对应的实现被标注为Sharable,那么这个方法都将返回true,表示它可以被添加到多个ChannelPipeline中。

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

6、资源管理

每当通过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都需要确保没有任何的资源泄露。你可能还记得前面的章节中所提到的,Netty使用引用技术来处理池化的ByteBuf。所以在完全使用完某个ByteBuf后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄露)问题,Netty提供了class ResourceLeakDetector,它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。

Netty定义了4种泄露检测级别。

DISABLED——禁用泄露检测

SIMPLE——使用1%的默认采样率检测并报告任何发现的泄露

ADVANCED——使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID——类似于ADVANCED,但是其将会对每次访问都进行采样,这对性能将会有很大的影响,应该只在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel = ADVANCED

如果带着该JVM选项重新启动你的应用程序,你将看到自己的应用程序最近被泄露的缓冲区被访问的位置。

实现ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的channelRead()操作直接消费入站消息的情况,也就是说,他不会通过调用ChannelHandlerContext.fireChannelRead()方法将入站消息转发给下一个ChannelInboundHandler。

消费入站消息的简单方式: 由于消费入站数据是一项常规任务,所以Netty提供了一个特殊的被称为SimpleChannelInboundHandler的ChannelInboundHandler实现,这个实现会在消息被channelRead0()方法消费之后自动释放消息。

在出站方向这边,如果你处理了write()操作并丢弃了一个消息,那么你也应该负责释放它。以下代码展示了一个丢弃所有的写入数据的实现。

@ChannelHandler.Sharablepublic class DiscardoutBoundHandler extends ChannelOutboundHandlerAdapter{

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        //释放资源
ReferenceCountUtil.release(msg);        //通知ChannelPromise数据已经被处理了
promise.setSuccess();
}
}

重要的是,不仅要释放资源,还要通知ChannelPromise。否则可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的消息。

总之,如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

7、ChannelPipeline接口

如果你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,那么就很容易看出这些ChannelHandler之间的交互式如何组成一个应用程序数据和时间处理逻辑的核心的。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联时永久的,Channel即不能附加另外一个ChannelPipeline,也不能分离其当前的,在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理,随后,通过调用ChannelHandlerContext实现,它将被转发给同一个超类型的下一个ChannelHandler。

ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互,ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。ChannelHandlerContext具有丰富的用于处理事件和执行I/O操作的API。

下图展示了一个典型的同时具有入站和出站ChannelHandler的ChannelPipeline的布局,并且印证了我们之前的关于ChannelPipeline主要由一系列的ChannelHandler所组成的说法,ChannelPipeline还提供了通过ChannelPipeline本身传播事件的方法。如果一个入站事件被触发,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端。如图所示,一个出站I/O事件将从ChannelPipeline的最右边开始,然后向左传播。

在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。

8、修改ChannelPipeline

通过调用ChannelPipeline上的相关方法,ChannelHandler可以添加、删除或者替换其他的ChannelHandler,从而实时地修改ChannelPipeline的布局。

ChannelPipeline pipeline = ...;
FirstHandler firstHandler = new FirstHandler();
//将该实例作为“handler1”添加到ChannelPipeline中
pipeline.addLast("handler1",firstHandler);
//将一个SecondHandler的实例作为“handler2”添加到ChannelPipeline的第一个槽中,这意味着它将被放置在已有的“handler1”之前
pipeline.addLast("handler2",new SecondHandler());
//将一个ThirdHandler的实例作为“handler3”添加到ChannelPipeline的最后一个槽中
pipeline.addLast("handler3",new ThirdHandler());        ...
//通过名称移除“handler3”
pipeline.remove("handler3");
//通过引用移除FirstHandler
pipeline.remove(firstHandler);
//将SecondHandler(“handler2”)替换为FourthHandler:"handler4"
pipeline.replace("handler2","handler4",new ForthHandler());

ChannelHandler的执行和阻塞:通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响。但有时可能需要与那些使用阻塞API的遗留代码进行交互,对于这个情况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法,如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除,对于这种用例,Netty提供了一种叫DefaultEventExecutorGroup的默认实现。

——ChannelPipeline保存了与Channel相关联的ChannelHandler

——ChannelPipeline可以根据需要、通过添加或者删除ChannelHandler来动态修改

——ChannelPipeline有着丰富的API用以被调用、以响应入站和出站事件

——ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的

9、使用ChannelHandlerContext

以下代码,将通过ChannelHandlerContext获取到Channel的引用,调用Channel上的write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline。

以下代码展示了一个类似的例子,但是这一次是写入ChannelPipeline。我们再次看到,(到ChannelPipeline的)引用是通过ChannelHandlerContext获取的。

ChannelHandlerContext ctx = ..; //获取到与ChannelHandlerContext相关联的Channel的引用 Channel channel = ctx.channel(); //通过Channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
为什么会想要从ChannelPipeline中的某个特定点开始传播事件呢?

——为了减少将事件传经对它不感兴趣的ChannelHandler所带来的开销

——为了避免将事件传经那些可能会对它感兴趣的ChannelHandler。

10、ChannelHandler和ChannelHandlerContext的高级用法

可以通过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换,缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler方法之外,甚至来自于不同的线程。

以下代码,缓存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter{

private ChannelHandlerContext ctx;    @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        //存储到ChannelHandlerContext的引用以供稍后使用
this.ctx = ctx;
}

public void send(String msg){        //使用之前存储的到ChannelHandlerContext的引用来发送消息
ctx.writeAndFlush(msg);
}
}

因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例,对于这种用法(指在多个ChannelPipeline中共享同一个ChannelHandler),对应的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常,显而易见,为了安全地被用于多个并发的Channel(连接),这样的ChannelHandler必须是线程安全的。

以下代码,展示这种模式。

@ChannelHandler.Sharable//使用注解@Sharable标注public class SharableHandler extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("Channel read message: " + msg);        //记录方法调用,并转发给下一个ChannelHandler
ctx.fireChannelRead(msg);
}

}

前面的ChannelHandler实现了符合所有的将其加入到多个ChannelPipeline的需求,即它使用了注解@Sharable标注,并且也不持有任何的状态。

以下代码,演示@Sharable的错误用法

@ChannelHandler.Sharablepublic class UnSharableHandler extends ChannelInboundHandlerAdapter{

private int count;    @Override
public void chan
8000
nelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //将count字段值加1
count++;        System.out.println("channelRead(...) called the " + count + " time");        //记录方法调用,并转发给下一个ChannelHandler
ctx.fireChannelRead(msg);
}
}

这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到ChannelPipeline将极有可能在它被多个并发Channel访问时导致问题。(可以将ChannelRead()方法变为同步方法)

总之,只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

为何要共享同一个ChannelHandler:在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

11、处理入站异常

异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现,因此,Netty提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现exceptionCaught方法。

以下代码,展示了其关闭Channel并打印了异常的栈跟踪信息

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

因为异常将会继续按照入站方向流动(就像所有入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后,这确保了所有的入站异常都总是会被处理,无论他们可能会发生在ChannelPipeline中的什么位置。

你应该如何响应异常,可能很大程序上取决于你的应用程序,你可能想要关闭Channel(和连接),也可能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑,那么Netty将会记录该异常没有被处理的事实。

——ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler

——如果异常到达了ChannelPipeline的尾端,它将会被记录为未处理

——要想定义自定义的处理逻辑,你需要重写exceptionCaught方法,然后你需要决定是否需要将该异常传播出去

12、处理出站异常

——每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了

——几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器,但是,ChannelPromise还具有提供立即通知的可写方法。

以下代码,添加channelFutureListener,它将打印栈跟踪信息,并且随后关闭Channel

ChannelFuture future = channel.wirte(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {                if (!channelFuture.isSuccess()){
channelFuture.cause().printStackTrace();
channelFuture.channel().close();
}
}
});

第二种方式是将ChannelFutrueListener添加到即将作为参数传递给ChannelOutboundHandler的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new ChannelFutureListener() {            @Override
public void operationComplete(ChannelFuture f) throws Exception {                if (!f.isSuccess()){
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}

ChannelPromise的可写方法:通过调用ChannelPromise上的setSuccess()和setFailure()方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: