您的位置:首页 > 理论基础 > 计算机网络

Netty学习——TCP粘包/拆包的问题

2018-01-26 12:16 471 查看

                      TCP粘包/拆包的问题

TCP粘包的概念 

       TCP是一个流协议,是通过输入输出流来输入输出数据的,因而底层是不知道业务逻辑。TCP的装包是根据TCP缓冲区的实际情况进行包的划分的,也就是TCP底层如果发现缓冲区的数据已经足够,才会把数据发送出来。这样,我们写到缓冲区中的数据其实是没有任何逻辑的,比如说可能我们写到缓冲区中的数据是1个半的消息(这个时候叫粘包,也就是多个业务逻辑包粘在一起了),这时候TCP把数据发送出来了,通信的另一方接收到了,解析出来发现不是完整的消息,就无法进行相应的业务处理。拆包也同理,就是无法根据完整的消息来进行数据的业务包拆分。
       下面就是一个粘包的例子,客户端向服务器端不断地发送100个消息,服务器每接收一次就打印一次,并打印出当前时候的毫秒数
package study.netty;

import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyEchoServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
ChannelFuture future = bootstrap.bind(8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private static class EchoChannelHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
byte[] bts = new byte[buffer.readableBytes()];
buffer.readBytes(bts);
String str = new String(bts, "utf-8");
System.out.println(new Date().getTime() + ":" + str);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoChannelHandler());
}

}
}

package study.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyEchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<S
f7a5
ocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}

private static class EchoClientHandler extends ChannelHandlerAdapter {
private ByteBuf msgBuf;
private byte[] msgBts;

public EchoClientHandler() {
msgBts = "please connect".getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
msgBuf = Unpooled.buffer(msgBts.length);
msgBuf.writeBytes(msgBts);
ctx.writeAndFlush(msgBuf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

}
}
}
运行结果是服务器端打印了两次
      但我们确实是发送了100个消息,而且从服务器端打印出来的消息体可以发送,TCP通信是多个消息合并成了一个再一次性发送到另一边。

粘包的解决方案

       既然粘包的原因是业务逻辑消息的长度和缓冲区的长度不一致造成的,因此就可以通过消息定长来解决,把每个消息的长度固定成缓冲区的长度,如果不够就用空格补齐,也可以在在真实消息的末尾在特定字符来标注,在拆包的时候相应取得真实数据即可。也可以像HTTP协议一样,把消息分成消息头和消息体,在消息头中用一个字段来存储这个消息的长度,在拆包的时候就取出相应长度的数据就刚好可以构成一个完整的消息了。

Netty对粘包的解决
     Netty既然作为一个优秀的网络通信库,那么一定就考虑到了TCP粘包的问题,肯定就有一定的解决方法。在Netty中提供了LineBasedFrameDecoder和StringDecoder等来处理粘包的问题。Netty采用了一种类似责任链模式的方法来设计,Netty会把处理器连成一条链,当数据到来或写出时,会通过这条链来处理,从头到尾地让多个处理器去匹配是否可以处理数据,如果可以处理,就把数据处理后传到下一个处理器,这样设计的好处很明显就是解开了数据和处理器的高度耦合,我们可以随时实现一个新的处理器,然后添加到处理器的链中去,这样,一个新的处理器就可以产生效果了。
    下面的代码是对上面的粘包代码的改进来解决粘包的问题。

package study.netty;

import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
ChannelFuture future = bootstrap.bind(8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private static class EchoChannelHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buffer = (ByteBuf) msg;
// byte[] bts = new byte[buffer.readableBytes()];
// buffer.readBytes(bts);
String str = (String) msg;// new String(bts, "utf-8");
System.out.println(new Date().getTime() + ":" + str);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoChannelHandler());
}

}
}

package study.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}

private static class EchoClientHandler extends ChannelHandlerAdapter {
private ByteBuf msgBuf;
private byte[] msgBts;

public EchoClientHandler() {
msgBts = ("please connect" + System.getProperty("line.separator")).getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
msgBuf = Unpooled.buffer(msgBts.length);
msgBuf.writeBytes(msgBts);
ctx.writeAndFlush(msgBuf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

}
}
}


改进的代码就是在原代码的基础上添加了两个解码器,一个LineBasedFrameDecoder和一个StringDecoder,然后将channelRead中的处理改成了直接把msg强转成String,这是因为StringDecoder把字节已经转成了字符了。
运行结果如我们预想的一样,打印出了100条消息。
       Netty提供的这个粘包解决方法的原理是LineBasedFrameDecoder会遍历ByteBuf中的可读字节,判断是否有换行符,如果有的话,就以此为结束位置,就相当于一个包已经到达,可以处理了。如果读取到了设置的最大长度还没有见到换行符,就会抛出异常。StringDecoder的功能更简单,就是将接收到的字节数据转换成字符数据,所以在channelRead中我们可以直接把msg强转成String类型。
      通过查看Netty源码,发现LineBasedFrameDecoder继承于ByteToMessageDecoder,而ByteToMessageDecoder又继承于ChannelHandlerAdapter,因此,也可以通过自己实现ChannelHandlerAdapter来实现解决粘包。最关键的是如何解决粘包的方法和策略,到如何实现解决方案时,可以用Netty或第三方提供的,如果都不能满足业务要求,也可以自己来实现以达到更好的业务扩展性。

     针对前面所说的解决方案,Netty还提供了分隔符作为结束标志的消息解码器DelimiterBasedFrameDecoder和定长包的FixedLengthFrameDecoder
     下面是使用DelimiterBasedFrameDecoder来解决粘包的代码,这里是用$来作为一个业务消息包的结束符
package study.netty;

import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
ChannelFuture future = bootstrap.bind(8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private static class EchoChannelHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buffer = (ByteBuf) msg;
// byte[] bts = new byte[buffer.readableBytes()];
// buffer.readBytes(bts);
String str = (String) msg;// new String(bts, "utf-8");
System.out.println(new Date().getTime() + ":" + str);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoChannelHandler());
}

}
}
package study.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}

private static class EchoClientHandler extends ChannelHandlerAdapter {
private ByteBuf msgBuf;
private byte[] msgBts;

public EchoClientHandler() {
msgBts = ("please connect" + "$").getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
msgBuf = Unpooled.buffer(msgBts.length);
msgBuf.writeBytes(msgBts);
ctx.writeAndFlush(msgBuf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

}
}
}


程序的运行结果和上面一样,也达到了预期效果

下面是使用FixedLengthFrameDecoder来实现解决粘包的效果的代码,也非常简单,也是添加相应的解码器就行了。
package study.netty;

import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
ChannelFuture future = bootstrap.bind(8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private static class EchoChannelHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buffer = (ByteBuf) msg;
// byte[] bts = new byte[buffer.readableBytes()];
// buffer.readBytes(bts);
String str = (String) msg;// new String(bts, "utf-8");
System.out.println(new Date().getTime() + ":" + str);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoChannelHandler());
}

}
}


package study.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyEchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}

private static class EchoClientHandler extends ChannelHandlerAdapter {
private ByteBuf msgBuf;
private byte[] msgBts;

public EchoClientHandler() {
msgBts = ("please connect").getBytes();
if (msgBts.length < 20) {
byte[] tem = new byte[20];
for (int i = 0; i < msgBts.length; i++) {
tem[i] = msgBts[i];
}
for (int i = msgBts.length; i < 20; i++) {
tem[i] = 0;
}
msgBts = tem;
} else if (msgBts.length > 20) {
byte[] tem = new byte[20];
for (int i = 0; i < 20; i++) {
tem[i] = msgBts[i];
}
msgBts = tem;
}
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
msgBuf = Unpooled.buffer(msgBts.length);
msgBuf.writeBytes(msgBts);
ctx.writeAndFlush(msgBuf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

}
}
}


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: