您的位置:首页 > 其它

深入研究Netty框架之ChannelHandler和ChannelPipeline

2017-06-21 00:00 543 查看
摘要: 本文主要介绍ChannelHandler和ChannelPipeline的基本功能和原理。文中源码基于Netty4.1

初识ChannelHandler和ChannelPipeline

ChannelHandler主要作用是处理I/O事件或拦截I/O操作,并将事件转发到其所属ChannelPipeline中的下一个ChannelHandler。那么如何转发呢???

事件转发:ChannelHandler提供了一个ChannelHandlerContext对象。 ChannelHandler通过ChannelHandlerContext对象与其所属的ChannelPipeline进行交互。 使用ChannelHandlerContext对象,ChannelHandler可以在上游或下游传递事件,执行I/O操作,动态修改流水线,或使用AttributeKeys存储ChannelHandler特有的信息等等。

ChannelPipeline提供了一个ChannelHandler链的容器,并定义了用于在该链上传播入站和出站事件流的Api。同时,ChannelPipeline实现了更高级的Intercepting Filter模式,使得开发人员能够完全控制事件的处理方式以及流水线中的ChannelHandler如何相互交互。

每个Channel都会和一个ChannelPipeline相关联,即持有一个ChannelHandler的实例链。当Channel被创建时,会自动创建一个专属的ChannelPipeline。ChannelHandler会通过以下步骤安装到ChannelPipeline:

将一个ChannelInitializer的实现注册到ServerBootstrap中;

当ChannelInitializer的initChannel()方法被调用时,ChannelInitializer将在ChannelPipeline中安装一组自定义的ChannelHandler;

ChannelInitializer将它自己从ChannelPipeline中移除

典型示例代码如下:

Bootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new KryoEncoder(kryoSerializationFactory));
ch.pipeline().addLast(new KryoDecoder(kryoSerializationFactory));
ch.pipeline().addLast(serverDispatchHandler);
}
});


ChannelHandler家族

ChannelHandler作为Handler程序的根接口,其家族非常庞大,有很多子类实现,本文会介绍几个常用的ChannelHandler实现,先看类图:



ChannelInboundHandler:处理入站事件的Handler实现的父接口,定义了一组入站事件传播方法;

ChannelOutboundHandler:处理出站事件的Handler实现的父接口,定义了一组出站事件传播方法;

ChannelHandlerAdapter:抽象类,提供了handlerAdded和handlerRemoved的空实现;

ChannelInboundHandlerAdapter:处理入站事件,实现了ChannelInboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成;需注意ChannelInboundHandlerAdapter的ChannelRead方法处理完消息后不会自动释放消息,若需要自动释放消息可以使用SimpleChannelInboundHandler。

ChannelOutboundHandlerAdapter:处理出站事件,实现了ChannelOutboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成。

MessageToByteEncoder:编码器,负责将java对象转化为字节序列;

ByteToMessageDecoder:解码器,负责将字节序列(如:ByteBuf)转化为java对象;

ChannelDuplexHandler:能同时处理入站事件和出站事件的Handler;

ChannelInitializer:用于在ChannelPipeline中安装一组自定义的ChannelHandler;

SimpleChannelInboundHandler:消息处理完之后,能够自动释放消息的Handler。

ChannelHandler状态管理

ChannelHandler通常需要存储一些状态信息,这里介绍两种实现方法。

实现1:使用成员变量(示例代码来自netty源码)

public interface Message {
// your methods here
}

public class DataServerHandler extends SimpleChannelInboundHandler<Message> {

private boolean loggedIn;

@Override
public void channelRead0(ChannelHandlerContext ctx, Message message) {
Channel ch = e.getChannel();
if (message instanceof LoginMessage) {
authenticate((LoginMessage) message);
loggedIn = true;
} else (message instanceof GetDataMessage) {
if (loggedIn) {
ch.write(fetchSecret((GetDataMessage) message));
} else {
fail();
}
}
}
...
}

loggedIn用于存储登录状态,由多线程并发编程可知,当多个线程并发调用channelRead0时,存在竞态条件,使得未经授权的客户端仍然可以访问机密信息。netty中可使用以下方法避免竞态条件的发生:

public class DataServerInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", new DataServerHandler());
}
}

每个channel独立拥有一个handler实例的状态变量,何以保证不会发生竞态条件?

这得益于netty线程模型的实现,其保证整个channel生命周期内的所有操作由同一个线程完成,因此不会发生竞态条件,后续文章会详细说明EventLoop实现原理。

实现2:使用ChannelHandlerContext提供的AttributeKeys,先看代码

@Sharable
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private final AttributeKey<Boolean> auth =
AttributeKey.valueOf("auth");

@Override
public void channelRead(ChannelHandlerContext ctx, Message message) {
Attribute<Boolean> attr = ctx.attr(auth);
Channel ch = ctx.channel();
if (message instanceof LoginMessage) {
authenticate((LoginMessage) o);
attr.set(true);
} else (message instanceof GetDataMessage) {
if (Boolean.TRUE.equals(attr.get())) {
ch.write(fetchSecret((GetDataMessage) o));
} else {
fail();
}
}
}
...
}

该情况下,多个channel可同时共享同一个handler实例:

public class DataServerInitializer extends ChannelInitializer<Channel> {
private static final DataServerHandler SHARED = new DataServerHandler();
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", SHARED);
}
}

@Sharable:ChannelHandler如果使用该注释,意味着同一个handler实例可添加到一个或多个ChannelPipelines多次(被多个channel共享),而不会发生竞争条件。

ChannelHandler生命周期

ChannelHandler接口主要定义了生命周期相关的方法,每个方法都包含一个ChannelHandlerContext做为入参。如下:

方法描述
handlerAdded当ChannelHandler被添加到一个ChannelPipeline时被调用
handlerRemoved当ChannelHandler从一个ChannelPipeline中移除时被调用
exceptionCaught处理过程中ChannelPipeline中发生错误时被调用

ChannelPipeline

事件流

ChannelPipeline是所有事件处理的总入口,其主要包含两条事件流线路:入站流(InBound)和出站流(OutBound),I / O事件由ChannelInboundHandler或ChannelOutboundHandler处理,并通过调用ChannelHandlerContext中定义的事件传播方法进行转发。先看事件流图:



右图可知,入站事件由自右而左方向的ChannelInboundHandler处理,ChannelInboundHandler通常处理由I/O线程生产的入站数据,入站数据通常通过实际的输入操作SocketChannel.read(ByteBuffer)从远程对等体读取。当入站事件超出ChannelInboundHandler最左端,则会以默认方式丢弃,也可根据需要进行记录。

出站事件则由自左而右方向的ChannelOutboundHandler处理,ChannelOutboundHandler通常生成或转换出站数据,如写入请求;如果出站事件超出了ChannelOutboundHandler最右端,则由与通道相关联的I / O线程处理。 I / O线程通常执行实际的输出操作,如SocketChannel.write(ByteBuffer)。

有如下ChannelPipeline实例:

ChannelPipeline p = ...;
p.addLast("1", new ChannelInboundHandlerA());
p.addLast("2", new ChannelInboundHandlerB());
p.addLast("3", new ChannelOutboundHandlerA());
p.addLast("4", new ChannelOutboundHandlerB());
p.addLast("5", new ChannelDuplexHandlerX());

1和2为入站事件Handler,3和4为出站事件Handler,5既是入站事件Handler,也是出站事件Handler。

在给定的示例配置中,当事件进入时,处理程序评估顺序为1,2,3,4,5。 当事件出站时,顺序为5,4,3,2,1。实际运行过程中,ChannelPipeline会通过评估而跳过某些Handler,即:

入站事件:实际的评估顺序为1,2,5

出站事件:实际的评估顺序为5,4,3

事件传播

入站事件通常由I/O线程触发,如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等等,事件传播通过ChannelHandlerContext中定义的方法来实现。

入站事件传播方法:

ChannelHandlerContext.fireChannelRegistered():channel注册事件

ChannelHandlerContext.fireChannelActive():TCP链路建立成功,channel激活事件

ChannelHandlerContext.fireChannelRead(Object):读事件

ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件

ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件

ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件

ChannelHandlerContext.fireChannelWritabilityChanged():channel可写状态变化通知事件

ChannelHandlerContext.fireChannelInactive():TCP连接关闭,链路不可用通知事件

ChannelHandlerContext.fireChannelUnregistered():channel取消注册事件

出站事件通常由用户主动发起的网络I/O操作,如用户发起的连接操作、绑定操作,消息发送操作等。

出站事件传播方法:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件

ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件

ChannelHandlerContext.write(Object, ChannelPromise):发送事件

ChannelHandlerContext.flush():刷新事件

ChannelHandlerContext.read():读事件

ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件

ChannelHandlerContext.close(ChannelPromise):关闭当前channel事件

ChannelHandlerContext.deregister(ChannelPromise):取消注册channel事件

实例化过程

文章开始已经介绍过,当Channel被创建时,会自动创建一个专属的ChannelPipeline,下面从源码分析:

首先,无论是客户端SocketChannel,还是服务端ServerSocketChannel,在实例化Channel时,最终都会调用AbstractChannel的构造方法(后续介绍Channel时再详细说明):

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

实例化Channel时,调用newChannelPipeline()创建一个channel专属的ChannelPipeline实例。下面看DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise =  new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

在 DefaultChannelPipeline 的构造方法中,实例化了两个特殊的字段:head和tail,一看便知,这是一个双向链表的头和尾。由head和tail的类型可知,在DefaultChannelPipeline中,维护了一个以 AbstractChannelHandlerContext 为节点的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。

TailContext类图:



HeadContext类图:



由类结构图可知,TailContext和HeadContext都继承了AbstractChannelHandlerContext,TailContext实现了ChannelInboundHandler,而HeadContext同时实现了ChannelInboundHandler和ChannelOutboundHandler。HeadContext会同时处理出站事件和入站事件,TailContext只能处理入站事件。再看看两者的构造器:

TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}

两者都会调用父类 AbstractChannelHandlerContext 的构造器, 区别在于参数inbound和outbound的值。

inbound和outbound属性用于快速确定一个handler处理入站事件还是出站事件,后文会介绍。

到此,ChannelPipeline的实例化过程完成。

本文主要介绍了netty的ChannelHandler和ChannelPipeline两个组件,介绍了ChannelHandler的家族成员,状态管理和生命周期,同时介绍了ChannelPipeline的功能和实例化。本文对分析和理解ChannelPipeline及ChannelHandlerContext的源码很有帮助。关于ChannelPipeline和ChannelHandlerContext源码分析请阅读https://my.oschina.net/7001/blog/1037187

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/994219
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息