深入研究Netty框架之ChannelHandler和ChannelPipeline
2017-06-21 00:00
543 查看
摘要: 本文主要介绍ChannelHandler和ChannelPipeline的基本功能和原理。文中源码基于Netty4.1
事件转发:ChannelHandler提供了一个ChannelHandlerContext对象。 ChannelHandler通过ChannelHandlerContext对象与其所属的ChannelPipeline进行交互。 使用ChannelHandlerContext对象,ChannelHandler可以在上游或下游传递事件,执行I/O操作,动态修改流水线,或使用AttributeKeys存储ChannelHandler特有的信息等等。
ChannelPipeline提供了一个ChannelHandler链的容器,并定义了用于在该链上传播入站和出站事件流的Api。同时,ChannelPipeline实现了更高级的Intercepting Filter模式,使得开发人员能够完全控制事件的处理方式以及流水线中的ChannelHandler如何相互交互。
每个Channel都会和一个ChannelPipeline相关联,即持有一个ChannelHandler的实例链。当Channel被创建时,会自动创建一个专属的ChannelPipeline。ChannelHandler会通过以下步骤安装到ChannelPipeline:
将一个ChannelInitializer的实现注册到ServerBootstrap中;
当ChannelInitializer的initChannel()方法被调用时,ChannelInitializer将在ChannelPipeline中安装一组自定义的ChannelHandler;
ChannelInitializer将它自己从ChannelPipeline中移除
典型示例代码如下:
ChannelInboundHandler:处理入站事件的Handler实现的父接口,定义了一组入站事件传播方法;
ChannelOutboundHandler:处理出站事件的Handler实现的父接口,定义了一组出站事件传播方法;
ChannelHandlerAdapter:抽象类,提供了handlerAdded和handlerRemoved的空实现;
ChannelInboundHandlerAdapter:处理入站事件,实现了ChannelInboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成;需注意ChannelInboundHandlerAdapter的ChannelRead方法处理完消息后不会自动释放消息,若需要自动释放消息可以使用SimpleChannelInboundHandler。
ChannelOutboundHandlerAdapter:处理出站事件,实现了ChannelOutboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成。
MessageToByteEncoder:编码器,负责将java对象转化为字节序列;
ByteToMessageDecoder:解码器,负责将字节序列(如:ByteBuf)转化为java对象;
ChannelDuplexHandler:能同时处理入站事件和出站事件的Handler;
ChannelInitializer:用于在ChannelPipeline中安装一组自定义的ChannelHandler;
SimpleChannelInboundHandler:消息处理完之后,能够自动释放消息的Handler。
实现1:使用成员变量(示例代码来自netty源码)
loggedIn用于存储登录状态,由多线程并发编程可知,当多个线程并发调用channelRead0时,存在竞态条件,使得未经授权的客户端仍然可以访问机密信息。netty中可使用以下方法避免竞态条件的发生:
每个channel独立拥有一个handler实例的状态变量,何以保证不会发生竞态条件?
这得益于netty线程模型的实现,其保证整个channel生命周期内的所有操作由同一个线程完成,因此不会发生竞态条件,后续文章会详细说明EventLoop实现原理。
实现2:使用ChannelHandlerContext提供的AttributeKeys,先看代码
该情况下,多个channel可同时共享同一个handler实例:
@Sharable:ChannelHandler如果使用该注释,意味着同一个handler实例可添加到一个或多个ChannelPipelines多次(被多个channel共享),而不会发生竞争条件。
右图可知,入站事件由自右而左方向的ChannelInboundHandler处理,ChannelInboundHandler通常处理由I/O线程生产的入站数据,入站数据通常通过实际的输入操作SocketChannel.read(ByteBuffer)从远程对等体读取。当入站事件超出ChannelInboundHandler最左端,则会以默认方式丢弃,也可根据需要进行记录。
出站事件则由自左而右方向的ChannelOutboundHandler处理,ChannelOutboundHandler通常生成或转换出站数据,如写入请求;如果出站事件超出了ChannelOutboundHandler最右端,则由与通道相关联的I / O线程处理。 I / O线程通常执行实际的输出操作,如SocketChannel.write(ByteBuffer)。
有如下ChannelPipeline实例:
1和2为入站事件Handler,3和4为出站事件Handler,5既是入站事件Handler,也是出站事件Handler。
在给定的示例配置中,当事件进入时,处理程序评估顺序为1,2,3,4,5。 当事件出站时,顺序为5,4,3,2,1。实际运行过程中,ChannelPipeline会通过评估而跳过某些Handler,即:
入站事件:实际的评估顺序为1,2,5
出站事件:实际的评估顺序为5,4,3
入站事件传播方法:
ChannelHandlerContext.fireChannelRegistered():channel注册事件
ChannelHandlerContext.fireChannelActive():TCP链路建立成功,channel激活事件
ChannelHandlerContext.fireChannelRead(Object):读事件
ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件
ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件
ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件
ChannelHandlerContext.fireChannelWritabilityChanged():channel可写状态变化通知事件
ChannelHandlerContext.fireChannelInactive():TCP连接关闭,链路不可用通知事件
ChannelHandlerContext.fireChannelUnregistered():channel取消注册事件
出站事件通常由用户主动发起的网络I/O操作,如用户发起的连接操作、绑定操作,消息发送操作等。
出站事件传播方法:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件
ChannelHandlerContext.write(Object, ChannelPromise):发送事件
ChannelHandlerContext.flush():刷新事件
ChannelHandlerContext.read():读事件
ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件
ChannelHandlerContext.close(ChannelPromise):关闭当前channel事件
ChannelHandlerContext.deregister(ChannelPromise):取消注册channel事件
首先,无论是客户端SocketChannel,还是服务端ServerSocketChannel,在实例化Channel时,最终都会调用AbstractChannel的构造方法(后续介绍Channel时再详细说明):
实例化Channel时,调用newChannelPipeline()创建一个channel专属的ChannelPipeline实例。下面看DefaultChannelPipeline的构造方法:
在 DefaultChannelPipeline 的构造方法中,实例化了两个特殊的字段:head和tail,一看便知,这是一个双向链表的头和尾。由head和tail的类型可知,在DefaultChannelPipeline中,维护了一个以 AbstractChannelHandlerContext 为节点的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。
TailContext类图:
HeadContext类图:
由类结构图可知,TailContext和HeadContext都继承了AbstractChannelHandlerContext,TailContext实现了ChannelInboundHandler,而HeadContext同时实现了ChannelInboundHandler和ChannelOutboundHandler。HeadContext会同时处理出站事件和入站事件,TailContext只能处理入站事件。再看看两者的构造器:
两者都会调用父类 AbstractChannelHandlerContext 的构造器, 区别在于参数inbound和outbound的值。
inbound和outbound属性用于快速确定一个handler处理入站事件还是出站事件,后文会介绍。
到此,ChannelPipeline的实例化过程完成。
本文主要介绍了netty的ChannelHandler和ChannelPipeline两个组件,介绍了ChannelHandler的家族成员,状态管理和生命周期,同时介绍了ChannelPipeline的功能和实例化。本文对分析和理解ChannelPipeline及ChannelHandlerContext的源码很有帮助。关于ChannelPipeline和ChannelHandlerContext源码分析请阅读https://my.oschina.net/7001/blog/1037187
欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/994219
初识ChannelHandler和ChannelPipeline
ChannelHandler主要作用是处理I/O事件或拦截I/O操作,并将事件转发到其所属ChannelPipeline中的下一个ChannelHandler。那么如何转发呢???事件转发:ChannelHandler提供了一个ChannelHandlerContext对象。 ChannelHandler通过ChannelHandlerContext对象与其所属的ChannelPipeline进行交互。 使用ChannelHandlerContext对象,ChannelHandler可以在上游或下游传递事件,执行I/O操作,动态修改流水线,或使用AttributeKeys存储ChannelHandler特有的信息等等。
ChannelPipeline提供了一个ChannelHandler链的容器,并定义了用于在该链上传播入站和出站事件流的Api。同时,ChannelPipeline实现了更高级的Intercepting Filter模式,使得开发人员能够完全控制事件的处理方式以及流水线中的ChannelHandler如何相互交互。
每个Channel都会和一个ChannelPipeline相关联,即持有一个ChannelHandler的实例链。当Channel被创建时,会自动创建一个专属的ChannelPipeline。ChannelHandler会通过以下步骤安装到ChannelPipeline:
将一个ChannelInitializer的实现注册到ServerBootstrap中;
当ChannelInitializer的initChannel()方法被调用时,ChannelInitializer将在ChannelPipeline中安装一组自定义的ChannelHandler;
ChannelInitializer将它自己从ChannelPipeline中移除
典型示例代码如下:
Bootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new KryoEncoder(kryoSerializationFactory)); ch.pipeline().addLast(new KryoDecoder(kryoSerializationFactory)); ch.pipeline().addLast(serverDispatchHandler); } });
ChannelHandler家族
ChannelHandler作为Handler程序的根接口,其家族非常庞大,有很多子类实现,本文会介绍几个常用的ChannelHandler实现,先看类图:ChannelInboundHandler:处理入站事件的Handler实现的父接口,定义了一组入站事件传播方法;
ChannelOutboundHandler:处理出站事件的Handler实现的父接口,定义了一组出站事件传播方法;
ChannelHandlerAdapter:抽象类,提供了handlerAdded和handlerRemoved的空实现;
ChannelInboundHandlerAdapter:处理入站事件,实现了ChannelInboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成;需注意ChannelInboundHandlerAdapter的ChannelRead方法处理完消息后不会自动释放消息,若需要自动释放消息可以使用SimpleChannelInboundHandler。
ChannelOutboundHandlerAdapter:处理出站事件,实现了ChannelOutboundHandler中定义的所有方法,只负责传播事件,通过委托ChannelHandlerContext来完成。
MessageToByteEncoder:编码器,负责将java对象转化为字节序列;
ByteToMessageDecoder:解码器,负责将字节序列(如:ByteBuf)转化为java对象;
ChannelDuplexHandler:能同时处理入站事件和出站事件的Handler;
ChannelInitializer:用于在ChannelPipeline中安装一组自定义的ChannelHandler;
SimpleChannelInboundHandler:消息处理完之后,能够自动释放消息的Handler。
ChannelHandler状态管理
ChannelHandler通常需要存储一些状态信息,这里介绍两种实现方法。实现1:使用成员变量(示例代码来自netty源码)
public interface Message { // your methods here } public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private boolean loggedIn; @Override public void channelRead0(ChannelHandlerContext ctx, Message message) { Channel ch = e.getChannel(); if (message instanceof LoginMessage) { authenticate((LoginMessage) message); loggedIn = true; } else (message instanceof GetDataMessage) { if (loggedIn) { ch.write(fetchSecret((GetDataMessage) message)); } else { fail(); } } } ... }
loggedIn用于存储登录状态,由多线程并发编程可知,当多个线程并发调用channelRead0时,存在竞态条件,使得未经授权的客户端仍然可以访问机密信息。netty中可使用以下方法避免竞态条件的发生:
public class DataServerInitializer extends ChannelInitializer<Channel> { @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", new DataServerHandler()); } }
每个channel独立拥有一个handler实例的状态变量,何以保证不会发生竞态条件?
这得益于netty线程模型的实现,其保证整个channel生命周期内的所有操作由同一个线程完成,因此不会发生竞态条件,后续文章会详细说明EventLoop实现原理。
实现2:使用ChannelHandlerContext提供的AttributeKeys,先看代码
@Sharable public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth"); @Override public void channelRead(ChannelHandlerContext ctx, Message message) { Attribute<Boolean> attr = ctx.attr(auth); Channel ch = ctx.channel(); if (message instanceof LoginMessage) { authenticate((LoginMessage) o); attr.set(true); } else (message instanceof GetDataMessage) { if (Boolean.TRUE.equals(attr.get())) { ch.write(fetchSecret((GetDataMessage) o)); } else { fail(); } } } ... }
该情况下,多个channel可同时共享同一个handler实例:
public class DataServerInitializer extends ChannelInitializer<Channel> { private static final DataServerHandler SHARED = new DataServerHandler(); @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", SHARED); } }
@Sharable:ChannelHandler如果使用该注释,意味着同一个handler实例可添加到一个或多个ChannelPipelines多次(被多个channel共享),而不会发生竞争条件。
ChannelHandler生命周期
ChannelHandler接口主要定义了生命周期相关的方法,每个方法都包含一个ChannelHandlerContext做为入参。如下:方法 | 描述 |
handlerAdded | 当ChannelHandler被添加到一个ChannelPipeline时被调用 |
handlerRemoved | 当ChannelHandler从一个ChannelPipeline中移除时被调用 |
exceptionCaught | 处理过程中ChannelPipeline中发生错误时被调用 |
ChannelPipeline
事件流
ChannelPipeline是所有事件处理的总入口,其主要包含两条事件流线路:入站流(InBound)和出站流(OutBound),I / O事件由ChannelInboundHandler或ChannelOutboundHandler处理,并通过调用ChannelHandlerContext中定义的事件传播方法进行转发。先看事件流图:右图可知,入站事件由自右而左方向的ChannelInboundHandler处理,ChannelInboundHandler通常处理由I/O线程生产的入站数据,入站数据通常通过实际的输入操作SocketChannel.read(ByteBuffer)从远程对等体读取。当入站事件超出ChannelInboundHandler最左端,则会以默认方式丢弃,也可根据需要进行记录。
出站事件则由自左而右方向的ChannelOutboundHandler处理,ChannelOutboundHandler通常生成或转换出站数据,如写入请求;如果出站事件超出了ChannelOutboundHandler最右端,则由与通道相关联的I / O线程处理。 I / O线程通常执行实际的输出操作,如SocketChannel.write(ByteBuffer)。
有如下ChannelPipeline实例:
ChannelPipeline p = ...; p.addLast("1", new ChannelInboundHandlerA()); p.addLast("2", new ChannelInboundHandlerB()); p.addLast("3", new ChannelOutboundHandlerA()); p.addLast("4", new ChannelOutboundHandlerB()); p.addLast("5", new ChannelDuplexHandlerX());
1和2为入站事件Handler,3和4为出站事件Handler,5既是入站事件Handler,也是出站事件Handler。
在给定的示例配置中,当事件进入时,处理程序评估顺序为1,2,3,4,5。 当事件出站时,顺序为5,4,3,2,1。实际运行过程中,ChannelPipeline会通过评估而跳过某些Handler,即:
入站事件:实际的评估顺序为1,2,5
出站事件:实际的评估顺序为5,4,3
事件传播
入站事件通常由I/O线程触发,如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等等,事件传播通过ChannelHandlerContext中定义的方法来实现。入站事件传播方法:
ChannelHandlerContext.fireChannelRegistered():channel注册事件
ChannelHandlerContext.fireChannelActive():TCP链路建立成功,channel激活事件
ChannelHandlerContext.fireChannelRead(Object):读事件
ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件
ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件
ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件
ChannelHandlerContext.fireChannelWritabilityChanged():channel可写状态变化通知事件
ChannelHandlerContext.fireChannelInactive():TCP连接关闭,链路不可用通知事件
ChannelHandlerContext.fireChannelUnregistered():channel取消注册事件
出站事件通常由用户主动发起的网络I/O操作,如用户发起的连接操作、绑定操作,消息发送操作等。
出站事件传播方法:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件
ChannelHandlerContext.write(Object, ChannelPromise):发送事件
ChannelHandlerContext.flush():刷新事件
ChannelHandlerContext.read():读事件
ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件
ChannelHandlerContext.close(ChannelPromise):关闭当前channel事件
ChannelHandlerContext.deregister(ChannelPromise):取消注册channel事件
实例化过程
文章开始已经介绍过,当Channel被创建时,会自动创建一个专属的ChannelPipeline,下面从源码分析:首先,无论是客户端SocketChannel,还是服务端ServerSocketChannel,在实例化Channel时,最终都会调用AbstractChannel的构造方法(后续介绍Channel时再详细说明):
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
实例化Channel时,调用newChannelPipeline()创建一个channel专属的ChannelPipeline实例。下面看DefaultChannelPipeline的构造方法:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
在 DefaultChannelPipeline 的构造方法中,实例化了两个特殊的字段:head和tail,一看便知,这是一个双向链表的头和尾。由head和tail的类型可知,在DefaultChannelPipeline中,维护了一个以 AbstractChannelHandlerContext 为节点的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。
TailContext类图:
HeadContext类图:
由类结构图可知,TailContext和HeadContext都继承了AbstractChannelHandlerContext,TailContext实现了ChannelInboundHandler,而HeadContext同时实现了ChannelInboundHandler和ChannelOutboundHandler。HeadContext会同时处理出站事件和入站事件,TailContext只能处理入站事件。再看看两者的构造器:
TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
两者都会调用父类 AbstractChannelHandlerContext 的构造器, 区别在于参数inbound和outbound的值。
inbound和outbound属性用于快速确定一个handler处理入站事件还是出站事件,后文会介绍。
到此,ChannelPipeline的实例化过程完成。
本文主要介绍了netty的ChannelHandler和ChannelPipeline两个组件,介绍了ChannelHandler的家族成员,状态管理和生命周期,同时介绍了ChannelPipeline的功能和实例化。本文对分析和理解ChannelPipeline及ChannelHandlerContext的源码很有帮助。关于ChannelPipeline和ChannelHandlerContext源码分析请阅读https://my.oschina.net/7001/blog/1037187
欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/994219
相关文章推荐
- 深入研究Netty框架之Channel和Unsafe详解
- Netty 之 ChannelHandler,ChannelHandlerContext,ChannelPipeline
- 一起学Netty(四)之 ChannelHandler,ChannelHandlerContext,ChannelPipeline
- 一起学Netty(四)之 ChannelHandler,ChannelHandlerContext,ChannelPipeline
- 一起学Netty(四)之 ChannelHandler,ChannelHandlerContext,ChannelPipeline
- Netty in action—ChannelHandler和ChannelPipeline
- 代理二:深入研究InvocationHandler、动态代理类工作原理、实现AOP框架
- 深入研究Netty框架之ByteBuf功能原理及源码分析
- 深入研究Netty框架之ByteBuf类继承结构
- Netty之ChannelHandler,ChannelHandlerContext,ChannelPipeline
- netty中的ChannelHandler和ChannelPipeline
- 异步通信框架Netty之ChannelHandlerAdapter探究
- 深入研究Netty框架之ByteBuf家族
- Netty笔记(五)ChannelHandler
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty的Handler、Future、Channel的UML类图
- netty ChannelInboundByteHandlerAdapter