您的位置:首页 > 编程语言 > Java开发

JAVA Socket编程学习8--为什么使用Netty

2017-12-07 13:45 573 查看
转载自:http://blog.csdn.net/yinwenjie/article/details/48829419和http://blog.csdn.net/yinwenjie/article/details/48969853

1、Netty介绍

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

        但实际上呢,Netty框架并不只是封装了多路复用的IO模型,也包括提供了传统的阻塞式/非阻塞式 同步IO的模型封装。当然,从Netty官网上的几句中文并不能概括完Netty的全部作用。下面的两篇文章我们将会在您已经理解原生的JAVA NIO框架的基础上,向您介绍Netty的原理和使用。

2、Netty快速上手
2-1、代码示例

        下面这段代码本身就比较好理解,我在其上又加上了比较详细的注解。相信就算您之前没有接触过Netty,也应该是可以看懂的。如果您之前接触过Netty,那您可以发现,这段代码中基本上已经包含了Netty中比较重要的几个概念了:Channel、Buffer、ChannelPipeline、ChannelHandler、ChannelHandlerContext等

        是的,我们将从这个示例代码入手,介绍Netty的基本概念和使用。然后我们再回头看看上文中的那个问题:为什么已经有的JAVA NIO框架,还需要一个Netty呢?

package testNetty;

import java.net.InetSocketAddress;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultThreadFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class TestTCPNetty {
static {
BasicConfigurator.configure();
}

public static void main(String[] args) throws Exception {
//这就是主要的服务启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();

//=======================下面我们设置线程池
//BOSS线程池
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
//WORK线程池:这样的申明方式,主要是为了向读者说明Netty的线程组是怎样工作的
ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool");
//CPU个数
int processorsNumber = Runtime.getRuntime().availableProcessors();
EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());
//指定Netty的Boss线程和work线程
serverBootstrap.group(bossLoopGroup , workLoogGroup);
//如果是以下的申明方式,说明BOSS线程和WORK线程共享一个线程池
//(实际上一般的情况环境下,这种共享线程池的方式已经够了)
//serverBootstrap.group(workLoogGroup);

//========================下面我们设置我们服务的通道类型
//只能是实现了ServerChannel接口的“服务器”通道类
serverBootstrap.channel(NioServerSocketChannel.class);
//当然也可以这样创建(那个SelectorProvider是不是感觉很熟悉?)
//serverBootstrap.channelFactory(new ChannelFactory<NioServerSocketChannel>() {
// @Override
// public NioServerSocketChannel newChannel() {
// return new NioServerSocketChannel(SelectorProvider.provider());
// }
//});

//========================设置处理器
//为了演示,这里我们设置了一组简单的ByteArrayDecoder和ByteArrayEncoder
//Netty的特色就在这一连串“通道水管”中的“处理器”
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
/* (non-Javadoc)
* @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel)
*/
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(new TCPServerHandler());
ch.pipeline().addLast(new ByteArrayDecoder());
}
});

//========================设置netty服务器绑定的ip和端口
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 83));
//还可以监控多个端口
//serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 84));
}
}

/**
* @author yinwenjie
*/
@Sharable
class TCPServerHandler extends ChannelInboundHandlerAdapter {
/**
* 日志
*/
private static Log LOGGER = LogFactory.getLog(TCPServerHandler.class);

/**
* 每一个channel,都有独立的handler、ChannelHandlerContext、ChannelPipeline、Attribute
* 所以不需要担心多个channel中的这些对象相互影响。<br>
* 这里我们使用content这个key,记录这个handler中已经接收到的客户端信息。
*/
private static AttributeKey<StringBuffer> content = AttributeKey.valueOf("content");

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelRegistered(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelRegistered(ctx)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelUnregistered(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelUnregistered(ctx)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelActive(ctx) = " + ctx.toString());
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelInactive(ctx)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
TCPServerHandler.LOGGER.info("channelRead(ChannelHandlerContext ctx, Object msg)");
/*
* 我们使用IDE工具模拟长连接中的数据缓慢提交。
* 由read方法负责接收数据,但只是进行数据累加,不进行任何处理
* */
ByteBuf byteBuf = (ByteBuf)msg;
try {
StringBuffer contextBuffer = new StringBuffer();
while(byteBuf.isReadable()) {
contextBuffer.append((char)byteBuf.readByte());
}

//加入临时区域
StringBuffer content = ctx.attr(TCPServerHandler.content).get();
if(content == null) {
content = new StringBuffer();
ctx.attr(TCPServerHandler.content).set(content);
}
content.append(contextBuffer);
} catch(Exception e) {
throw e;
} finally {
byteBuf.release();
}
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContext ctx)");
/*
* 由readComplete方法负责检查数据是否接收完了。
* 和之前的文章一样,我们检查整个内容中是否有“over”关键字
* */
StringBuffer content = ctx.attr(TCPServerHandler.content).get();
//如果条件成立说明还没有接收到完整客户端信息
if(content.indexOf("over") == -1) {
return;
}

//当接收到信息后,首先要做的的是清空原来的历史信息
ctx.attr(TCPServerHandler.content).set(new StringBuffer());

//准备向客户端发送响应
ByteBuf byteBuf = ctx.alloc().buffer(1024);
byteBuf.writeBytes("回发响应信息!".getBytes());
ctx.writeAndFlush(byteBuf);

/*
* 关闭,正常终止这个通道上下文,就可以关闭通道了
* (如果不关闭,这个通道的回话将一直存在,只要网络是稳定的,服务器就可以随时通过这个回话向客户端发送信息)。
* 关闭通道意味着TCP将正常断开,其中所有的
* handler、ChannelHandlerContext、ChannelPipeline、Attribute等信息都将注销
* */
ctx.close();
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
TCPServerHandler.LOGGER.info("super.userEventTriggered(ctx, evt)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelWritabilityChanged(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.channelWritabilityChanged(ctx)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
TCPServerHandler.LOGGER.info("super.exceptionCaught(ctx, cause)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.handlerAdded(ctx)");
}

/* (non-Javadoc)
* @see io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
TCPServerHandler.LOGGER.info("super.handlerRemoved(ctx)");
}
}        这是server端的代码。就像我在前文中提到的一样,客户端是否使用了NIO技术实际上对整个系统架构的性能影响不大。您可以使用任何支持TCP/IP协议技术的代码,作为客户端。可以使用Python、C++、C#、JAVA等等任意的编程语言。

2-2、代码片段讲解

//BOSS线程池

EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);

BOSS线程池实际上就是JAVA NIO框架中selector工作角色(这个后文会详细讲),针对一个本地IP的端口,BOSS线程池中有一条线程工作,工作内容也相对简单,就是发现新的连接;Netty是支持同时监听多个端口的,所以BOSS线程池的大小按照需要监听的服务器端口数量进行设置就行了。

//Work线程池

int processorsNumber = Runtime.getRuntime().availableProcessors();

EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());

这段代码主要是确定Netty中工作线程池的大小,这个大小一般是物理机器/虚拟机器 可用内核的个数 * 2。work线程池中的线程(如果封装的是JAVA NIO,那么具体的线程实现类就是NioEventLoop)都固定负责指派给它的网络连接的事件监听,并根据事件状态,调用不同的ChannelHandler事件方法。而最后一个参数SelectorProvider说明了这个EventLoop所使用的多路复用IO模型为操作系统决定。

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);

serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

option方法,可以设置这个ServerChannel相应的各种属性(在代码中我们使用的是NioServerSocketChannel);childOption方法用于设置这个ServerChannel收到客户端时间后,所生成的新的Channel的各种属性(代码中,我们生成的是NioSocketChannel)。详细的option参数可以参见ChannelOption类中的注释说明。

3、重要概念
3-1、Netty线程机制

还记的我们在讲解JAVA NIO框架对 多路复用IO技术 的支持时,讲到的Selector选择器吗?它大致的工作方式是:
while(true) {
if(selector.select(100) == 0) {
//================================================
// 这里视业务情况,可以做一些然并卵的事情
//================================================
continue;
}

Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

while(selecionKeys.hasNext()) {
SelectionKey readyKey = selecionKeys.next();
selecionKeys.remove();

SelectableChannel selectableChannel = readyKey.channel();
if(readyKey.isValid() && readyKey.isAcceptable()) {
。。。
} else if(readyKey.isValid()&&readyKey.isConnectable()) {
。。。
} else if(readyKey.isValid()&&readyKey.isReadable()) {
。。。
}
}
}在前文介绍JAVA对多路复用IO技术的支持中,我们说过,Selector可以是在主线程上面操作,也可以是一个独立的线程进行操作。在Netty中,这里的部分工作就是交给BOSS线程做的。BOSS线程负责发现连接到服务器的新的channel(SocketServerChannel的ACCEPT事件),并且将这个channel经过检查后注册到WORK连接池的某个EventLoop线程中。

而当WORK线程发现操作系统有一个它感兴趣的IO事件时(例如SocketChannel的READ事件)则调用相应的ChannelHandler事件。当某个channel失效后(例如显示调用ctx.close())这个channel将从绑定的EventLoop中被剔除。

在Netty中,如果我们使用的是一个JAVA NIO框架的封装,那么进行这个循环的是NioEventLoop类(实现多路复用的支持时)。参见该类中的processSelectedKeysPlain方法 和 processSelectedKey方法。另外在这个类中Netty解决了之前我们说到的java nio中”Selector.select(timeout) CPU 100%” 的BUG和一个“NullPointerException in Selector.open()”(http://bugs.java.com/view_bug.do?bug_id=6427854)的BUG:

processSelectedKeysPlain方法:

for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();

// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}processSelectedKey方法:
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924 int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
一个Work线程池的线程将按照底层JAVA NIO的Selector的事件状态,决定执行ChannelHandler中的哪一个事件方法(Netty中的事件,包括了channelRegistered、channelUnregistered、channelActive、channelInactive等事件方法)。执行完成后,work线程将一直轮询直到操作系统回复下一个它所管理的channel发生了新的IO事件。

3-2、ByteBuf

Netty重写了JAVA NIO框架中的缓存结构,并将这个结构应用在更上层的封装中。

为什么要重写呢?JBOSS-Netty给出的解释是:我写的缓存比JAVA中的ByteBuffer牛。好吧,作为一个屌丝IT从业人员,我想我不能说什么了。



这里说一说Netty中几个比较特别的ByteBuf实现:

# io.netty.buffer.EmptyByteBuf:这是一个初始容量和最大容量都为0的缓存区。一般我们用这种缓存区描述“没有任何处理结果”,并将其向下一个handler传递。

# io.netty.buffer.ReadOnlyByteBuf:这是一个不允许任何“写请求”的只读缓存区。一般是通过Unpooled.unmodifiableBuffer(ByteBuf)方法将某一个正常可读写的缓存区转变而成。如果我们需要在下一个Handler处理的过程中禁止写入任何数据到缓存区,就可以在这个handler中进行“只读缓存区”的转换。

# io.netty.buffer.UnpooledDirectByteBuf:基本的JAVA NIO框架的ByteBuffer封装。一般我们直接使用这个缓存区实现来处理Handler事件。

# io.netty.buffer.PooledByteBuf:Netty4.X版本的缓存新特性,主要是为了减少之前unpoolByteBuf在创建和销毁时的GC时间。

3-3、Channel

Channel,通道。您可以使用JAVA NIO中的Channel去初次理解它,但实际上它的意义和JAVA NIO中的通道意义还不一样。我们可以理解成:“更抽象、更丰富”。如下图所示:



Netty中的Channel专门代表网络通信,这个和JAVA NIO框架中的Channel不一样,后者中还有类似FileChannel本地文件IO通道。由于前者专门代表网络通信,所以它是由客户端地址 + 服务器地址 + 网络操作状态构成的,请参见io.netty.channel.Channel接口的定义。

每一个Netty中的Channel,比JAVA NIO中的Channel更抽象。这是为什么呢?在Netty中,不止封装了JAVA NIO的IO模型,还封装了JAVA BIO的阻塞同步IO通信模型。将他们在表现上都抽象成Channel了。这就是为什么Netty中有io.netty.channel.oio.AbstractOioChannel这样的抽象类。

其io.netty.channel.oio.AbstractOioChannel抽象类上的注解也说明得比较清楚:Abstract base class for Channel implementations that use Old-Blocking-IO

您可以这样理解:Netty的Channel更具业务抽象性。

3-4、ChannelPipeline和ChannelHandler

Netty中的每一个Channel,都有一个独立的ChannelPipeline,中文称为“通道水管”。只不过这个水管是双向的里面流淌着数据,数据可以通过这个“水管”流入到服务器,也可以通过这个“水管”从服务器流出。

在ChannelPipeline中,有若干的过滤器。我们称之为“ChannelHandler”(处理器或者过滤器)。同“流入”和“流出”的概念向对应:用于处理/过滤 流入数据的ChannelHandler,称之为“ChannelInboundHandler”;用于处理/过滤 流出数据的ChannelHandler,称之为“ChannelOutboundHandler”。

如下图所示:



3-4-1、责任链和适配器的应用

数据在ChannelPipeline中有一个一个的Handler进行处理,并形成一个新的数据状态。这是典型的“责任链”模式。

需要注意,虽然数据管道中的Handler是按照顺序执行的,但不代表某一个Handler会处理任何一种由“上一个handler”发送过来的数据。某些Handler会检查传来的数据是否符合要求,如果不符合自己的处理要求,则不进行处理。

我们可以实现ChannelInboundHandler接口或者ChannelOutboundHandler接口,来实现我们自己业务的“数据流入处理器”或者“数据流出”处理器。

但是这两个接口的事件方法是比较多的,例如ChannelInboundHandler接口一共有11个需要实现的接口方法(包括父级ChannelHandler的,我们在下一节讲解Channel的生命周期时,回专门讲到这些事件的执行顺序和执行状态),一般情况下我们不需要把这些方法全部实现。

所以Netty中增加了两个适配器“ChannelInboundHandlerAdapter”和“ChannelOutboundHandlerAdapter”来帮助我们去实现我们只需要实现的事件方法。其他的事件方法我们就不需要关心了:



在我上文给出的示例代码中,书写的业务处理器TCPServerHandler就是继承了ChannelInboundHandlerAdapter适配器。下面,我们将介绍几个常使用的ChannelInboundHandler处理器和ChannelOutboundHandler处理器

3-4-2、ChannelInboundHandler类举例

HttpRequestDecoder:实现了Http协议的数据输入格式的解析。这个类将数据编码为HttpMessage对象,并交由下一个ChannelHandler进行处理。

ByteArrayDecoder:最基础的数据流输入处理器,将所有的byte转换为ByteBuf对象(一般的实现类是:io.netty.buffer.UnpooledUnsafeDirectByteBuf)。我们进行一般的文本格式信息传输到服务器时,最好使用这个Handler将byte数组转换为ByteBuf对象。

DelimiterBasedFrameDecoder:这个数据流输入处理器,会按照外部传入的数据中给定的某个关键字符/关键字符串,重新将数据组装为新的段落并发送给下一个Handler处理器。后文中,我们将使用这个处理器进行TCP半包的问题。

还有很多直接支持标准数据格式解析的处理器,例如支持Google Protocol Buffers 数据格式解析的ProtobufDecoder和ProtobufVarint32FrameDecoder处理器。

3-4-3、ChannelOutboundHandler类举例

HttpResponseEncoder:这个类和HttpRequestDecoder相对应,是将服务器端HttpReponse对象的描述转换成ByteBuf对象形式,并向外传播。

ByteArrayEncoder:这个类和ByteArrayDecoder,是将服务器端的ByteBuf对象转换成byte数组的形式,并向外传播。一般也和ByteArrayDecoder对象成对使用。

还有支持标准的编码成Google Protocol Buffers格式、JBoss Marshalling 格式、ZIP压缩格式的ProtobufEncoder、ProtobufVarint32LengthFieldPrepender、MarshallingEncoder、JZlibEncoder等

4、Channel的生命周期

上面第3小节,讲到了Netty中的重要概念。我们花很大篇幅讲解了Channel、ChannelPipeline、ChannelHandler,以及他们的联系和工作方式。

在说到ChannelInHandler为什么会使用“适配器”模式的时候,特别指出了原因:因为ChannelInHandler接口中的方法加上父级接口中的方法,总共有11个接口事件方法需要实现。而事实上很多时候我们只会关心其中的一个或者两个接口方法。

那么这些方法是什么时候被触发的呢?这就要说到Netty中一个Channel的生命周期了(这里我们考虑的生命周期是指Netty对JAVA NIO技术框架的封装):



这里有一个channel事件没有在图中说明,就是exceptionCaught(ChannelHandlerContext, Throwable)事件。只要在调用图中的所有事件方法时,有异常抛出,exceptionCaught方法就会被调用。

另外,不是channelReadComplete(ChannelHandlerContext)方法调用后就一定会调用channelInactive事件方法。channelReadComplete和channelRead是可以反复调用的,只要客户端有数据发送过来。

最后补充一句,这个生命周期的事件方法调用顺序只是针对Netty封装使用JAVA NIO框架时,并且在进行TCP/IP协议监听时的事件方法调用顺序。

5、再次审视为什么使用Netty

上面我们讨论了Netty的基本原理,重要概念,并使用java代码描述了Netty的基本使用。当然Netty的技术涵盖点远远不是那一篇基础代码就可以全部概括的,但是至少可以给读者一个切入点。让大家去思考一个我们一直在讨论的问题:为什么有了JAVA NIO框架后我们还需要有Netty这样的框架对底层再次进行封装?

5-1、IO模型的封装
5-1-1、再次总结IO模型

在前文中我们已经提到了,几种典型的IO模型:

# 阻塞和非阻塞:这个概念是针对应用程序而言,是指应用程序中的线程在向操作系统发送IO请求后,是否一直等待操作系统的IO响应。如果是,那么就是阻塞式的;如果不是,那么应用程序一般会以轮询的方式以一定周期询问操作系统,直到某次获得了IO响应为止(轮序间隔应用程序线程可以做一些其他工作)。

# 同步和异步:IO操作都是由操作系统进行的(这里的IO操作是个广泛概念了:磁盘IO、网络IO都算),不同的操作系统对不同设备的IO操作都有不同的模式。同步和异步这两个概念都指代的操作系统级别,同步IO是指操作系统和设备进行交互时,必须等待一次完整的请求-响应完成,才能进行下一次操作(当然操作系统和设备本身也有很多技术加快这个反应过程,例如“磁盘预读”技术、数据缓存技术);异步IO是指操作系统和设备进行交互时,不必等待本次得到响应,就可以直接进行下一次操作请求。设备处理完某次请求后,会主动给操作系统相应的响应通知。

# 多路复用IO:多路复用IO,从本质上看还是一种同步IO,因为它没有100%消除IO_WAIT,操作系统也没有为它提供“主动通知”机制。但是多路复用IO的处理速度已经相当快了,利用设备执行IO操作的时间,操作系统可以继续执行IO请求。并同样采用周期性轮询的方式,获取一批IO操作请求的执行响应。操作系统支持的多路复用IO技术主要有select、poll、epoll、kqueue。

# 阻塞式同步IO模型:这个从字面上就很好理解了,应用程序请求IO操作,并一直等待处理结果;操作系统同时也进行IO操作,并等待设备的处理结果;可以看出,应用程序的请求线程和操作系统的内核线程都是等待状态。

# 非阻塞式同步IO模型:应用程序请求IO,并且不用一直等待返回结果就去做其他事情。隔一定的周期,再去询问操作系统上次IO操作有没有结果,直到某一次询问从操作系统拿到IO结果;操作系统内核线程在进行IO操作时,还是处理一直等待设备返回操作结果的状态。

# 非阻塞式多路复用IO模型:应用程序请求IO的工作采用非阻塞方式进行;操作系统采用多路复用模式工作。

# 非阻塞式异步IO模型:应用程序请求IO的工作采用非阻塞方式进行,但是不需要轮询了,因为操作系统异步IO其中一个主要特性就是:可以在有IO响应结果的时候,主动进行通知。

5-1-2、对IO模型的再次封装

以上这些IO工作模型,在JAVA中都能够找到对应的支持:传统的JAVA Socket套接字支持阻塞/非阻塞模式下的同步IO(有的技术资料里面也称为OIO或者BIO);JAVA NIO框架在不同操作系统下支持不同种类的多路复用IO技术(windows下的select模型、Linux下的poll/epoll模型);JAVA AIO框架支持异步IO(windows下的IOCP和Linux使用epoll的模拟AIO)

实际上Netty是对JAVA BIO 、JAVA NIO框架的再次封装。让我们不再纠结于选用哪种底层实现。您可以理解成Netty/MINA 框架是一个面向上层业务实现进行封装的“业务层”框架。而JAVA Socket框架、JAVA NIO框架、JAVA AIO框架更偏向于对下层技术实现的封装,是面向“技术层”的框架。

5-2、数据信息格式的封装

“技术层”框架本身只对IO模型技术实现进行了封装,并不关心IO模型中流淌的数据格式;“业务层”框架对数据格式也进行了处理,让我们可以抽出精力关注业务本身。

Protobuf数据协议的集成:Netty利用自身的Channelpipeline的设计(在《架构设计:系统间通信(6)——IO通信模型和Netty 上篇》中讲过),对Protobuf数据协议进行了无缝结合。

JBoss Marshalling数据协议的集成:JBoss Marshalling 是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,又保持跟 java.io.Serializable 接口的兼容。Netty通过封装这个协议,可以帮助我们在客户端-服务端简便的进行对象系列化和反序列化。

HTTP Request / HTTP Response 协议的集成:在Netty中,可以方便的接受和发送Http协议。也就是说,我们可以使用Netty搭建自己的WEB服务器,当然您还可以根据自己的业务要求,方便的设计类似于Struts、Spring MVC这样的WEB框架。

5-3、解决了“技术层”框架中的技术问题

通过阅读Netty框架的代码,我们知道了Netty框架至少解决了JAVA NIO框架中的一些Bug:

JDK-6427854 : (se) NullPointerException in Selector.open()。http://bugs.java.com/view_bug.do?bug_id=6427854。这个Bug的官方描述是:

sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException:

private static String bugLevel = null;

static boolean atBugLevel(String bl) { // package-private
if (bugLevel == null) {
if (!sun.misc.VM.isBooted())
return false;
java.security.PrivilegedAction pa =
new GetPropertyAction("sun.nio.ch.bugLevel");
// the next line can reset bugLevel to null
bugLevel = (String)AccessController.doPrivileged(pa);
if (bugLevel == null)
bugLevel = "";
}
return (bugLevel != null) && bugLevel.equals(bl);
}Suppose that two threads enter the "if (buglevel == null)" body at the same time. The first one runs until the return line and gets scheduled out right after the (buglevel != null) check. The second one then runs until right after the doPrivileged()
call, sets bugLevel to null and gets scheduled out. The first one continues and hits a NullPointerException while calling bugLevel.equals() with bugLevel being null.

这个问题在Netty框架中,负责进行JAVA NIO Selector的NioEventLoop类中得到了解决。

workaround the infamous epoll 100% CPU bug。http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933。这个Bug出现在linux系统环境,大致是说JAVA NIO 框架在实现 Linux内核 kernel 2.6+中的epoll模型时。Selector.select(timeout)方法不能阻塞指定的timeout时间,导致CPU 100%的情况:
A DESCRIPTION OF THE PROBLEM :
Trying to get all bindings from the transient nameserver brings orbd into a state where it consumes 100% CPU. Its interesting to note that the problem only occurs if the client is programmed in c++. I was not able to reproduce the problem with a client programmed in Java.

STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
1) Get omniORB 4.1 from http://omniorb.sourceforge.net
2) Compile omniORB (requires python devel package installed)
cd /tmp
mkdir omni_local
tar xvzf omniORB-4.1.0.tar.gz
cd omniORB-4.1.0
./configure --prefix=/tmp/omni_local
make
make install

3) Compile the test program (binding_browser, source attached to this report)
g++ -I/tmp/omni_local/include -L/tmp/omni_local/lib -lomniORB4 -lomniDynamic4 -lomnithread -lpthread -lrt BindingBrowser.cc -o binding_browser

4) Start orbd
<JAVA_HOME>/bin/orbd -ORBInitialPort 12345

5) Start binding_browser (from another shell)
5a) export LD_LIBRARY_PATH=/tmp/omni_local/lib:$LD_LIBRARY_PATH
5b) ./binding_browser -ORBInitRef NameService=corbaloc::1.2@localhost:12345/TNameService

Repeat step 5b until orbd consumes 100% cpu.这个问题从官方的Bug Database中的描述看,是在JDK7的版本中被解决的。Netty框架在JDK 6+的环境下在JAVA NIO框架封装之上解决了这个Bug。

5-4、半包和粘包问题

请看我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/78738253

5-5、专注于业务

Netty框架的特性,使我们不需要关心下层所工作的IO模型,利用Netty提供的面向事件驱动的方法结构,使我们更能集中精力关注应用层业务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: