您的位置:首页 > 其它

netty(十三)源码分析之Channel

2017-08-29 20:02 746 查看
类似于NIO的Channel,Netty提供了自己的Channel和其子类实现,用于异步I/O操作和其他相关的操作。

Unsafe是个内部接口,聚合在Channel中协助进行网络读写相关的操作,因为它的设计初衷就是Channel的内部辅助类,不应该被Netty框架的上层使用者调用,所以被命名为Unsafe。这里不能仅从字面理解认为它是不安全的操作,而要从这个架构的设计层面体会它的设计初衷和职责。

Channel功能说明

io.netty.channel.Channel是Netty网络操作抽象类,它聚合了一组功能,包括但不限于网路的读,写,客户端发起连接,主动关闭连接,链路关闭,获取通信双方的网络地址等。它也包含了Netty框架相关的一些功能,包括获取该Channel的EventLoop,获取缓冲分配器ByteBufAllocator和pipeline等。

下面我们先从Channel的接口分析,讲解它的主要API和功能,然后再一起看下它的子类的相关功能实现,最后再对重要子类和接口进行源码分析。

Channel的工作原理

Channel是Netty抽象出来的网络I/O读写相关的接口,为什么不适用JDK NIO原生的Channel而要另起炉灶呢,主要原因如下:

(1)JDK的SocketChannel和ServerSocketChannel的主要职责就是网络I/O操作,由于它们是SPI类接口,由具体的虚拟机厂家来提供,所以通过继承SPI功能类来扩展其功能的难度很大;直接实现ServerSocketChannel和SocketChannel抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。

(2)Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型,基于ChannelPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel和ServerSocketChanel都没有提供,需要重新封装。

(3)自定义的Channel,功能实现更加灵活。

基于上述4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较复杂,主要的设计理念如下。

(1)在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作,网络I/O相关的其他操作封装起来,统一对外提供。

(2)Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。

(3)具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一分配和调度,功能实现更加灵活。

Channel的功能实现

1.网络I/O操作和其他相关的操作。
Channel网络I/O相关的方法定义如下:



下面对这些API的功能进行分类说明:

(1)Channel read():从当前的Channel中读取数据到第一个inbound缓冲区中,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。读取操作API调用完成后,紧接着会触发ChannelHander.channelReadConplete(ChannelHandlerContext)事件,这样业务的ChannelHandler可以决定是否需要继续读取数据。如果已经有操作请求被挂起,则后续的读操作会被忽略。

(2)ChannelFuture write(Object msg):请求将当前的msg通过ChannelPipeline写入到目标Channel中。注意,write操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush操作才会被写入到Channel中,发送给对方。

(3)ChannelFuture close(ChannelPromise promise):主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功,都可以通过ChannelPromise获取操作结果。该操作会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。

(4)ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的连接并使用ChannelPromise来获取操作结果的通知信息。该方法会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。

(5)ChnnelConfig config():获取当前Channel的配置信息,例如CONNECT_TIMEOUT_MILLS。

(6)boolean isOpen():判断当前Channel是否已经打开。

(7)ChannelMetadata metadata():获取当前Channel的元数据描述信息,包括TCP参数配置等。

(8)SocketAddress localAddress():获取当前Channel的本地绑定地址。

(9)SocketAddress remoteAddress():获取当前Channel通信的远程Socket地址。

2.其他常用的API功能说明

第一个比较重要的方法是eventLoop().Channel需要注册到EventLoop的多路复用器上,用于处理I/O事件,通过eventLoop()方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。

第二个比较常用的方法是metadata()方法。熟悉TCP协议的读者可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时时间。是否重用地址等。在Netty中,每个Channel对应一个物理链接,每个连接都有自己的TCP参数配置。所以,Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata()方法就可以获取当前Channel的TCP参数配置。

第三个方法是parent()。对于服务端Channel而言,它的父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。

第四个方法是用户获取Channel标识的id(),它返回ChannelId对象,ChannelId是Channel的唯一标识。

Channel源码分析

Channel的实现子类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个————io.netty.channel.socket.nio.NioServerSocketChannel和io.netty.channel.socket.nio.NioSocketChannel进行重点分析。





AbstractChannel源码分析

1.成员变量定义

在分析AbstractChannel源码之前,我们看下它的成员变量定义,首先定了两个静态全局异常,如下:

static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();链路已经关闭异常

static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();链路尚未建立异常

声明完上述两个异常之后,通过静态块将它们的堆栈设置为空的StackTraceElement。

estimatorHandle用于预测下一个报文的大小,它基于之前的数据的采样进行分析预测。

根据之前的Channel原理分析,我们知道AbstractChannel采用聚合的方式封装各种功能。

AbstractChannel聚合了所有Channel使用到的能力对象,由AbstractChannel提供初始化和统一封装,如果功能和子类强相关,则定义成抽象方法由子类具体实现,下面对它的主要API进行源码分析。

2.核心API源码分析

首先看下网络读写操作,前面介绍网络I/O操作时讲到,它会触发ChannelPipeline中对应的事件方法。Netty基于事件驱动,我们也可以理解为当Channel进行I/O操作时会产生对应的I/O事件,然后驱动事件在ChannnelPipeline中传播,由对应的ChannelHandler对事件进行拦截和处理,不关心的事件可以直接忽略。采用事件驱动的方式可以非常轻松地通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更高,但是功能却基本等价。但使用场景,远远还是及不上AOP。

网络I/O操作直接调用DefaultChannelPipelien的相关方法,由DefaultChannnelPipeline中对应的ChannnelHandler进行具体的逻辑处理,如下所示:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}

@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}

@Override
public ChannelFuture close() {
return pipeline.close();
}

@Override
public Channel flush() {
pipeline.flush();
return this;
}


AbstractChannel也提供了一些公共API的具体实现,例如localAddress()和remoteAddress()方法,它的源码实现如下所示:

@Override
public SocketAddress remoteAddress() {
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.r
d8bd
emoteAddress = remoteAddress = unsafe().remoteAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}


首先从缓存的成员变量中获取,如果第一次调用为空,需要通过unsafe的remoteAddress获取,它是个抽象方法,具体由对应的Channel子类实现。

AbstractNioChannel源码分析

首先,还是从成员变量定义入手,来了解下它的功能实现,成员变量定义如下:

private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);

private final SelectableChannel ch;
protected final int readInterestOp;
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;

/**
* The future of the current connection attempt.  If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;

由于NIO Channel、NioSocketChannel和NioServerSocketChannel需要共用,所以定义了一个java.nio.SocketChannel和java.nio.ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行I/O操作。

第二个参数是readInterestOp,它代表了JDK SelectionKey的OP_READ.

随后定义了一个volatile修饰的SelectionKey,该SelectionKey是注册到EventLoop后返回的选择键。由于Channel会面临多个业务线程的并发写操作,当SelectionKey由SelectionKey修改后,为了能让其他业务线程感知到变化,所以需要使用volatile保证修饰的可见性。

最后定义了代表连接操作结果的ChannelPromise以及连接超时定时器ScheduledFuture和请求的通信地址信息。

2.核心API源码分析

我们看下AbstractNioChannel实现的主要API,首先是Channel的注册,如下:

protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}


定义一个局部变量selected来标识注册操作是否成功,调用SelectableChannel的register()方法,将当前的Channel注册到EventLoop的多路复用器上。

注册Channel的时候需要指定监听的网络操作位来表示Channel对哪几类网络事件感兴趣,具体的定义如下:

public static final int OP_READ = 1 <<9:读操作位
public static final int OP_WRITE=1 << 2 写操作位
public static final int OP_CONNECT = 1<< 3读客户端连接服务端操作位
public static final int OP_ACCEPT = 1 << 4 服务端接收客户端连接操作位。
AbstractNioChannel注册的是0,说明对任何事件都不感兴趣,仅仅完成注册操作。注册的时候可以指定附件,后续Channel接收到网络事件通知时可以从SelectionKey中重新获取之前的附件进行处理,此处将AbstractNioChannel的实现子类自身当作附件注册。如果注册Channel成功,则返回selectionKey,通过selectionKey可以从多路复用器中获取Channel对象。

下面继续看另一个比较重要的方法:准备处理读操作之前需要设置网络操作位为读,如下所示:
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}

final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty channel