您的位置:首页 > 其它

NETTY基础教程(1):基本概念

2018-02-11 16:22 267 查看

参考文献

https://www.jianshu.com/p/b9f3f6a16911

概述

如果没有netty,远古时代我们会使用java.net + java.io,在近代,我们会使用java.nio,感谢这个时代,我们最终有了netty。



而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端——这个过程是阻塞的,这样就能让一个Thread处理更多的请求了。

cat的用途很广,从阿里的jstorm到大众点评的cat,都使用netty作为底层通信。

线程模型

Boss线程:

每个server服务器都会有一个boss线程,每绑定一个InetSocketAddress都会产生一个boss线程,比如:我们开启了两个服务器端口80和443,则我们会有两个boss线程。一个boss线程在端口绑定后,会接收传进来的连接,一旦连接接收成功,boss线程会指派一个worker线程处理连接。在boss线程接受了socket连接求后,会产生一个channel(一个打开的socket对应一个打开的channel),并把这个channel交给ServerBootstrap初始化时指定的ServerSocketChannelFactory来处理,boss线程则继续处理socket的请求。

worker线程:

ServerSocketChannelFactory则会从worker线程池中找出一个worker线程来继续处理这个请求。一般而言,我们使用NioServerSocketChannelFactory,每个worker可以服务不同的socket或者说channel,worker线程和channel不再有一一对应的关系。当然netty也支持一个worker线程对应一个channel的模式。

Channel

Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。

ChannelHandler,核心处理业务就在这里,用于处理业务请求。

ChannelHandlerContext,用于传输业务数据。

ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。

bytebuf

Netty中的UnpooledHeapByteBuf的byte[],可以由JVM自动GC,

UnpooledDirectByteBuf的底层是DirectByteBuffer,只有在没有堆内对象引用的情况下才会被回收。设想一种危险的情况,堆内内存非常充足,但是堆外内存则已经濒临枯竭,所以堆内对象一直存活,堆外内存就不会被回收。所以,这种内存也建议手动回收。

PooledHeapByteBuf 和 PooledDirectByteBuf,则必须要主动将用完的byte[]/ByteBuffer放回池里,否则内存就要爆掉。

简单的IM DEMO

SERVER端核心代码。ServerBootstrap负责初始化netty服务器。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new HelloServerInitializer());

// 服务器绑定端口监听
ChannelFuture f = b.bind(portNumber).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();

// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程。

public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// 以("\n")为结尾分割的 解码器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

// 字符串解码 和 编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());

// 自己的逻辑Handler
pipeline.addLast("handler", new HelloServerHandler());
}
}


每个Channel都有自己的pipeline,用于存储处理该Channel的Handler

public class HelloServerHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 收到消息直接打印输出
System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);

// 返回客户端消息 - 我已经接收到了你的消息
ctx.writeAndFlush("Received your message !\n");
}

//首次连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");

ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");

super.channelActive(ctx);
}
}


实际进行业务处理的Handler。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: