您的位置:首页 > 其它

3.netty解码器LineBaseFrameDecoder、StringDecoder

2017-12-24 15:10 246 查看

1.介绍

什么是粘包、拆包

TCP是个流协议,发送的数据是连成一片的没有分界线。TCP底层会根据缓冲区的实际情况进行包的划分,所以在业务上面认为一个完整的包被TCP拆分后发送即拆包,也可能将多个小的数据包整合成一个大的数据包发送即粘包问题。

产生粘包、拆包问题的原因

应用程序write写入的字节大小大于socket发送缓冲区的大小

进行了MSS大小的TCP分段

以太网帧的payload大于MTU进行ip分片

解决方案

由于底层tcp不理解上层业务,所以底层无法保证不进行报的拆分和重组的只能在上层的应用协议栈设计解决方式。业界注解方案如下:

消息定长

包尾增加回车换行符进行分割

消息分为消息体和消息头

更复杂的应用层协议

2.netty实现

netty提供了半包解码器来解决粘包和拆包问题

(1)粘包问题演示
server

package com.tyf.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Sharable
public class TimeServerHandler extends ChannelHandlerAdapter {

private int count;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取request的msg
ByteBuf buf = (ByteBuf) msg;
byte [] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
//去掉后面的回车换行符留下消息体并转换成string
String body = new String(bytes, "UTF-8").
substring(0, bytes.length-System.getProperty("line.separator").length());
System.out.println("客户端msg消息体:"+body+" , 收到消息次数:"+ ++count);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}

//创建起步程序
public static void  main(String [] argsStrings) throws Exception {
//配置服务端NIO线程组(boss线程、worker线程)
EventLoopGroup bGroup = new NioEventLoopGroup();
EventLoopGroup wGroup = new NioEventLoopGroup();
//创建启动辅助类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bGroup, wGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new TimeServerHandler());//绑定handker

try {
//监听本地端口,同步等待监听结果
ChannelFuture future = bootstrap.bind(11111).sync();
//等待服务端监听端口关闭,优雅退出
future.channel().closeFuture().sync();
}finally {
bGroup.shutdownGracefully();
wGroup.shutdownGracefully();
}

}

}
每收到一次消息就打印一下第几次收到消息

client

package com.tyf.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClientHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf mes = null;
//创建请求体,最后是回车换行符
byte[] req = new String("client message"+System.getProperty("line.separator")).getBytes();
//连续50次发送数据
for(int i=0;i<15;i++){
mes = Unpooled.buffer(req.length);
mes.writeBytes(req);
ctx.writeAndFlush(mes);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}

//创建起步程序
public static void  main(String [] argsStrings) throws Exception {
//配置客户端端NIO线程组
EventLoopGroup bGroup = new NioEventLoopGroup();
//创建客户端启动辅助类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bGroup).
channel(NioSocketChannel.class).
option(ChannelOption.TCP_NODELAY, true).
handler(new TimeClientHandler());//设置handler

//发起异步连接
ChannelFuture future = bootstrap.connect("127.0.0.1", 11111).syn
b87d
c();
try {
//等待客户端链路关闭
future.channel().closeFuture().sync();
} finally {
//优雅退出,释放资源
bGroup.shutdownGracefully();
}

}

}
循环15次发送消息,查看server端输出:



实际上客户端发送的15条数据被tcp整合粘包之后成一个数据包发送,server显示是一次性收到全部消息。很显然发生了数据粘包

(2)粘包问题解决
netty默认提供了多种解码器解决半包读写问题。LinBasedFrameDecoder就是其中一种

server

package com.tyf.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

@Sharable
public class TimeServerHandler extends ChannelHandlerAdapter {

private int count;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//直接获取request的msg
String body = (String) msg;
System.out.println("客户端msg消息体:"+body+" , 收到消息次数:"+ ++count);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}

//创建起步程序
public static void  main(String [] argsStrings) throws Exception {
//配置服务端NIO线程组(boss线程、worker线程)
EventLoopGroup bGroup = new NioEventLoopGroup();
EventLoopGroup wGroup = new NioEventLoopGroup();
//创建启动辅助类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bGroup, wGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//添加netty解码器,和用户handler
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new TimeServerHandler());
}

});

try {
//监听本地端口,同步等待监听结果
ChannelFuture future = bootstrap.bind(11111).sync();
//等待服务端监听端口关闭,优雅退出
future.channel().closeFuture().sync();
}finally {
bGroup.shutdownGracefully();
wGroup.shutdownGracefully();
}

}

}


添加两个解码器。其中linebaseframedecoder是以换行符为结束标记的解码器

client

package com.tyf.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClientHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf mes = null;
//创建请求体,最后是回车换行符
byte[] req = ("client message"+System.getProperty("line.separator")).getBytes();
//连续50次发送数据
for(int i=0;i<15;i++){
mes = Unpooled.buffer(req.length);
mes.writeBytes(req);
ctx.writeAndFlush(mes);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}

//创建起步程序
public static void  main(String [] argsStrings) throws Exception {
//配置客户端端NIO线程组
EventLoopGroup bGroup = new NioEventLoopGroup();
//创建客户端启动辅助类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bGroup).
channel(NioSocketChannel.class).
option(ChannelOption.TCP_NODELAY, true).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//添加netty解码器,和用户handler
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new TimeClientHandler());
}

});

//发起异步连接
ChannelFuture future = bootstrap.connect("127.0.0.1", 11111).sync();
try {
//等待客户端链路关闭
future.channel().closeFuture().sync();
} finally {
//优雅退出,释放资源
bGroup.shutdownGracefully();
}

}

}
也是添加两个解码器

查看server端结果:



3.LineBaseFrameDecoder解码器

LineBaseFrameDecoder是以换行符为结束标记的解码器

StringDecoder解码器就是将对象转成字符串,可以看到上面msg对象直接转成string对象
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: