您的位置:首页 > 理论基础 > 计算机网络

Netty权威指南 第2版学习笔记4——TCP粘包/拆包问题的解决之道

2017-01-02 11:41 633 查看

TCP粘包/拆包

假设客户端分别发送了两个数据包D1 和D2给服务商厦 ,由于服务端一次读取到的字节数是不确定的,故可能存在4种情况:

服务端分两次读取到了两个独立的数据包,分别是D1 和 D2,没有粘包和拆包

服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包

服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称为TCP拆包

服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包

TCP粘包/拆包发生的原因

问题产生的原因有三个,分别如下:

应用程序write写入的字节大小大于套接口发送缓冲区大小

进行MSS大小的TCP分段

以太网帧的payload大于MTU进行IP分片

图解:



MSS理解

首先理解几个概念:

MTU:Maxitum Transmission Unit最大传输单元。这个最大传输单元实际上和链路层协议有着密切的关系,EthernetII 帧的结构DMAC+SMAC+Type+Data+CRC。由于以太网传输限制,每个以太网帧都有最小的大小64bytes,最大不能超过1518bytes,对于小于或大于这个限制的以太网帧我们都可以视之为错误的数据帧,一般的以太网转发设备会丢弃这些数据帧。

MSS:Maxitum Segment Size最大分段大小。为了达到最佳的传输效能TCP协议在建立连接的时候通常要协商双方的MSS值,这个值TCP协议在实现的时候往往用MTU值代替,值往往为1460.IPV6中通常是1440

PPPoE:PPP Over Ethernet(在以太网上承载PPP协议),就是因为这个协议的出现我们才有必要修改我们的MSS或者是MTU值

粘包问题的解决策略

底层的TCP无法理解上层的业务数据,需要在上层的应用协议栈调来来解决。

消息定义,不够的空位补空格

在包尾增加回车换行符,如FTP协议

将消息分成消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度

更复杂的应用层协议

Netty提供了半包解码器来解决TCP粘包/拆包问题。

未考虑TCP粘包的异常案例

TimeServer.java

package com.phei.netty.bio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {
public void bind(int port)throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
ChannelFuture f=b.bind(port).sync();

f.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception{
int port=8080;
new TimeServer().bind(port);
}
}


TimeServerHandler.java

package com.phei.netty.bio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeServerHandler extends ChannelHandlerAdapter{
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println("The time server receive order : "+body
+" ; the counter is : " + ++counter);
String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime=currentTime + System.getProperty("line.separator");

ByteBuf resp =Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
ctx.close();
}
}


TimeClient.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
public class TimeClient {
public void connect(int port,String host) throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try{
Bootstrap b=new Bootstrap();
//Channel需要设置为NioSocketChannel,然后为其添加Handler
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>(){
//为了简单直接创建匿名内部类,实现initChannel方法
//其作用是当创建NioSocketChannel成功之后,在进行初始化时,
//将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接,然后调用同步方法等待连接成功
ChannelFuture f=b.connect(host,port).sync();
//当客户端连接关闭之后,客户端主函数退出,退出前释放NIO线程组的资源
f.channel().closeFuture().sync();
}finally{

}
}
public static void main(String[] args) throws Exception {
int port=8080;
new TimeClient().connect(port, "127.0.0.1");
}
}


TimeClientHandler.java

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter{
private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());
private int counter;
private byte[] req;

public TimeClientHandler(){
req=("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
/**
* 当客户端和服务器TCP链路建立成功后,NIO线程会调用channelActive方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
//发送查询时间的指令给服务端
ByteBuf message=null;
//循环发送100条消息,每发送一条就刷新一次
for(int i=0;i<100;i++){
message=Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 当服务端返回应答消息时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
/**
* 当发生异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
logger.warning("Unexpected exception from downstrea : " + cause.getMessage());
ctx.close();
}
}




运行结果达不到想要的效果。

利用LineBasedFrameDecoder 解决TCP粘包问题

服务端代码:

TimeServer.java

package com.phei.netty.bio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeServer {
public void bind(int port)throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
ChannelFuture f=b.bind(port).sync();

f.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//增加的两个解码器
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception{
int port=8080;
new TimeServer().bind(port);
}
}


TimeServerHandler.java

package com.phei.netty.bio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeServerHandler extends ChannelHandlerAdapter{
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
String body=(String)msg;
System.out.println("The time server receive order : "+body
+" ; the counter is : " + ++counter);
String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime=currentTime + System.getProperty("line.separator");

ByteBuf resp =Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
ctx.close();
}
}


TimeClient.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
public class TimeClient {
public void connect(int port,String host) throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try{
Bootstrap b=new Bootstrap();
//Channel需要设置为NioSocketChannel,然后为其添加Handler
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>(){
//为了简单直接创建匿名内部类,实现initChannel方法
//其作用是当创建NioSocketChannel成功之后,在进行初始化时,
//将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接,然后调用同步方法等待连接成功
ChannelFuture f=b.connect(host,port).sync();
//当客户端连接关闭之后,客户端主函数退出,退出前释放NIO线程组的资源
f.channel().closeFuture().sync();
}finally{

}
}
public static void main(String[] args) throws Exception {
int port=8080;
new TimeClient().connect(port, "127.0.0.1");
}
}


TimeClientHandler.java

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter{
private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());
private int counter;
private byte[] req;

public TimeClientHandler(){
req=("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
/**
* 当客户端和服务器TCP链路建立成功后,NIO线程会调用channelActive方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
//发送查询时间的指令给服务端
ByteBuf message=null;
//循环发送100条消息,每发送一条就刷新一次
for(int i=0;i<100;i++){
message=Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 当服务端返回应答消息时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
String body=(String)msg;
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
/**
* 当发生异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
logger.warning("Unexpected exception from downstrea : " + cause.getMessage());
ctx.close();
}
}


运行结果:



LineBasedFrameDecoder 是依次遍历ByteBuf中的可读字节,判断看是否有\n 或 \r\n,如果有,就以此位置为结束位置,以换行符为结束标志的解码器。它支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: