您的位置:首页 > 理论基础 > 计算机网络

Netty实践(二):TCP拆包、粘包问题

2017-01-09 21:56 405 查看
什么是TCP拆包、粘包?

在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现本应该一起处理的数据,被TCP划分为多个包发给接收方进行处理,如下图:




看一个TCP拆包、粘包的实例
客户端Handler:




服务端Handler:



运行结果:



上面的程序本意是CLIENT发送3次消息给SERVER,SERVER端理应处理3次,可是结果SERVER却将3条消息一次处理了。

那么如何解决TCP拆包、粘包问题呢?其实思路不外乎有3种:
第一种:发定长数据
接收方拿固定长度的数据,发送方发送固定长度的数据即可。但是这样的缺点也是显而易见的:如果发送方的数据长度不足,需要补位,浪费空间。
第二种:在包尾部增加特殊字符进行分割
发送方发送数据时,增加特殊字符;在接收方以特殊字符为准进行分割
第三种:自定义协议
类似于HTTP协议中的HEAD信息,比如我们也可以在HEAD中,告诉接收方数据的元信息(数据类型、数据长度等)

Netty如何解决TCP拆包、粘包问题?
《Java通信实战:编写自定义通信协议实现FTP服务》中,涉及到了JAVA SOCKET这方面的处理,大家可以参考。接下来,我们来看Netty这个框架是如何帮助我们解决这个问题的。本篇博客的代码在《Netty实践(一):轻松入门》基础上进行。

方式一:定长消息

Server启动类:




Client Handler:




运行结果:





利用FixedLengthFrameDecoder,加入到管道流处理中,长度够了接收方才能收到。

方式二:自定义分隔符
Server启动类:





Client Handler:




运行结果:







方式三:自定义协议
下面我们将简单实现一个自定义协议:
HEAD信息中包含:数据长度、数据版本
数据内容

MyHead

public class MyHead {

//数据长度
private int length;

//数据版本
private int version;

public MyHead(int length, int version) {
this.length = length;
this.version = version;
}

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

public int getVersion() {
return version;
}

public void setVersion(int version) {
this.version = version;
}

}


MyMessage
public class MyMessage {
//消息head
private MyHead head;
//消息body
private String content;

public MyMessage(MyHead head, String content) {
this.head = head;
this.content = content;
}

public MyHead getHead() {
return head;
}

public void setHead(MyHead head) {
this.head = head;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

@Override
public String toString() {
return String.format("[length=%d,version=%d,content=%s]",head.getLength(),head.getVersion(),content);
}
}


编码器
/**
* Created by Administrator on 17-1-9.
* 编码器 将自定义消息转化成ByteBuff
*/
public class MyEncoder extends MessageToByteEncoder<MyMessage> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {

int length = myMessage.getHead().getLength();
int version = myMessage.getHead().getVersion();
String content = myMessage.getContent();

byteBuf.writeInt(length);
byteBuf.writeInt(version);
byteBuf.writeBytes(content.getBytes(Charset.forName("UTF-8")));

}
}


解码器
/**
* Created by Administrator on 17-1-9.
* 解码器  将ByteBuf数据转化成自定义消息
*/
public class MyDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

int length = byteBuf.readInt();
int version = byteBuf.readInt();

byte[] body = new byte[length];
byteBuf.readBytes(body);

String content = new String(body, Charset.forName("UTF-8"));

MyMessage myMessage = new MyMessage(new MyHead(length,version),content);

list.add(myMessage);
}
}


Server启动类

public class Main {

public static void main(String[] args) {

EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2)
int port = 8867;
try {
ServerBootstrap b = new ServerBootstrap(); // (3)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (4)
.childHandler(new ChannelInitializer<SocketChannel>() { // (5)
@Override
public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new MyEncoder())
.addLast(new MyDecoder())
.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)          // (6)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (7)

// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (8)

// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
System.out.println("start server....");
f.channel().closeFuture().sync();
System.out.println("stop server....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("exit server....");
}

}
}


Server Handler
public class ServerHandler  extends ChannelHandlerAdapter {

//每当从客户端收到新的数据时,这个方法会在收到消息时被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

MyMessage in = (MyMessage) msg;
try {
// Do something with msg
System.out.println("server get :" + in);

} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}

}

//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
//当Netty由于IO错误或者处理器在处理事件时抛出的异常时
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();

}

}


Client启动类
public class Client {

public static void main(String[] args) {

EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MyDecoder());
p.addLast(new MyEncoder());
p.addLast(new ClientHandler());
}
});

// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8867).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}

}

}


Client Handler
public class ClientHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

ctx.writeAndFlush(new MyMessage(new MyHead("abcd".getBytes("UTF-8").length,1),"abcd"));

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf in = (ByteBuf) msg;
try {
// Do something with msg
System.out.println("client get :" + in.toString(CharsetUtil.UTF_8));

ctx.close();
} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}
}
}


运行结果




到这里,你会发现Netty处理TCP拆包、粘包问题很简单,通过编解码技术支持,让我们编写自定义协议也很方便,在后续的Netty博客中,我将继续为大家介绍Netty在实际中的一些应用(比如实现心跳检测),See You~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty