您的位置:首页 > 其它

Netty实现按字节解析的socket协议

2016-08-23 17:22 330 查看
Netty内部实现了很多通用协议的编码和解码。如果要实现自定义的协议,则需要自己实现编码或解码的功能。
继承ChannelInboundHandlerAdapter类,就可以实现一个自定义的解码器。但如果发送比较长的内容,则会出现内容读取不完整的问题。
其实比较简单的一个实现,就是设定协议头的几个字节为消息的长度即可,并在发送消息和处理消息时,处理消息的长度即可。
Server端的代码如下:
TcpServer.java

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class TcpServer {

private static final Logger logger = Logger.getLogger(TcpServer.class);
private static final String IP = "127.0.0.1";
private static final int PORT = 9999;

protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认

protected static final int BIZTHREADSIZE = 4;

private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

protected static void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
//                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SocketByteHandler());
}
});

b.bind(IP, PORT).sync();
logger.info("TCP服务器已启动");
}

protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

public static void main(String[] args) throws Exception {
PropertyConfigurator.configure("log/log4j.properties");
logger.info("开始启动TCP服务器...");
TcpServer.run();
//      TcpServer.shutdown();
}
}

LengthFieldBasedFrameDecoder和LengthFieldPrepender就是设定协议头长度的,我这里设定协议头的长度为4个字节。

协议处理类:
SocketByteHandler.java

import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketByteHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// super.channelRead(ctx, msg);
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println("Client said:" + resultStr);
// 释放资源,这行很关键
result.release();
String response = "I am ok!";
// 在当前场景下,发送的数据必须转换成ByteBuf数组
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
ctx.flush();
}

}

客户端可以使用最简单的socket来实现即可,如:

public static void sendMsgBySocket(byte[] msg){
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST,PORT));
socket.setKeepAlive(true);
OutputStream out = socket.getOutputStream();
ByteBuffer header = ByteBuffer.allocate(4);
header.putInt(msg.length);
out.write(header.array());
out.write(msg);
out.flush();
InputStream in = socket.getInputStream();
byte[] buff = new byte[4096];
int readed = in.read(buff);
if(readed > 0){
String str = new String(buff,4,readed);
logger.info("client received msg from server:"+str);
}
out.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

可以看到,header中就是给出的要发送的消息内容的总长度,但不包含协议头。这样,server端收到后,会自动忽略掉协议头的内容,这就是Netty的好处。

客户端如果读取server端的返回内容,而且用基本的socket(非Netty),则需要自己处理协议头:
String str = new String(buff,4,readed);
跳过协议头的4个字节长度后,就是Server端返回的真正的内容。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: