您的位置:首页 > 其它

netty——LengthFieldBasedFrameDecoder实例(解决半包)

2017-09-17 16:42 417 查看


1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输

可以解决半包粘包
2 代码部分
tcpserver

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.CharsetUtil;

import org.apache.log4j.Logger;

public class TcpServer {

private static final Logger logger = Logger.getLogger(TcpServer.class);

private static final String IP = "127.0.0.1";

private static final int PORT = 9999;

/**用于分配处理业务线程的线程组个数 */

protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认

/** 业务出现线程大小*/

protected static final int BIZTHREADSIZE = 4;

/*

* NioEventLoopGroup实际上就是个线程池,

* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,

* 每一个NioEventLoop负责处理m个Channel,

* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel

*/

private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);

private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

protected static void run() throws Exception {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup);

b.channel(NioServerSocketChannel.class);

b.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast(new TcpServerHandler());

}

});

// b.bind(IP, PORT).sync();

ChannelFuture f = b.bind(PORT).sync(); // (7)

f.channel().closeFuture().sync();

logger.info("TCP服务器已启动");

}

protected static void shutdown() {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

public static void main(String[] args) throws Exception {

logger.info("开始启动TCP服务器...");

TcpServer.run();

// TcpServer.shutdown();

}

}

TcpServerHandler

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class TcpServerHandler extends ChannelHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)

ctx.write(msg); // (1)

String obj= (String)msg;

System.out.println("访问数据"+obj);

// ctx.write(obj); // (1)

ctx.flush(); // (2)

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)

// Close theconnection when an exception is raised.

cause.printStackTrace();

ctx.close();

}

@Override

public void channelActive(final ChannelHandlerContext ctx) {

ctx.writeAndFlush("有客户端连接"); // (3)

}

}

TcpClientHander

[java] view
plain copy

print?

<span style="font-size:18px;">package com.bimatrix.revit.nettyTest;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class TcpClientHandler extends ChannelHandlerAdapter {

public TcpClientHandler() {

}

//private byte[] req;

/**

* 链路链接成功

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

/*for (int i = 0; i < 1000; i++) {

ctx.writeAndFlush("1ac");

} */

// 链接成功后发送

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

System.out.println("client收到数据" +msg);

// ctx.write("收到数据!");

// ctx.write(msg);

// ctx.write("w2d");

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

} </span>

TcpClient

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.CharsetUtil;

import org.apache.log4j.Logger;

public class TcpClient {

private static final Logger logger = Logger.getLogger(TcpClient.class);

public static String HOST = "127.0.0.1";

public static int PORT = 9999;

public static Bootstrap bootstrap = getBootstrap();

public static Channel channel = getChannel(HOST,PORT);

/**

* 初始化Bootstrap

* @return

*/

public static final Bootstrap getBootstrap(){

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class);

b.handler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast("handler", new TcpClientHandler());

}

});

b.option(ChannelOption.SO_KEEPALIVE, true);

return b;

}

public static final Channel getChannel(String host,int port){

Channel channel = null;

try {

channel = bootstrap.connect(host, port).sync().channel();

} catch (Exception e) {

logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e);

return null;

}

return channel;

}

public static void sendMsg(String msg) throws Exception {

if(channel!=null){

channel.writeAndFlush(msg).sync();

}else{

logger.warn("消息发送失败,连接尚未建立!");

}

}

public static void main(String[] args) throws Exception {

try {

long t0 = System.nanoTime();

for (int i = 0; i < 100000; i++) {

if(i%3==0){

TcpClient.sendMsg(i+"11aaa222aaa");

}else if(i%3==1){

TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");

}else if(i%3==2){

TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");

}

}

long t1 = System.nanoTime();

System.out.println((t1-t0)/1000000.0);

} catch (Exception e) {

e.printStackTrace();

}

}

}

上面代码部分下面这一块 主要测试消息发送是否会造成粘包

for (int i = 0; i < 100000; i++) {

if(i%3==0){

TcpClient.sendMsg(i+"11aaa222aaa");

}else if(i%3==1){

TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");

}else if(i%3==2){

TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");

}

}



1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输

可以解决半包粘包
2 代码部分
tcpserver

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.CharsetUtil;

import org.apache.log4j.Logger;

public class TcpServer {

private static final Logger logger = Logger.getLogger(TcpServer.class);

private static final String IP = "127.0.0.1";

private static final int PORT = 9999;

/**用于分配处理业务线程的线程组个数 */

protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认

/** 业务出现线程大小*/

protected static final int BIZTHREADSIZE = 4;

/*

* NioEventLoopGroup实际上就是个线程池,

* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,

* 每一个NioEventLoop负责处理m个Channel,

* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel

*/

private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);

private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

protected static void run() throws Exception {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup);

b.channel(NioServerSocketChannel.class);

b.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast(new TcpServerHandler());

}

});

// b.bind(IP, PORT).sync();

ChannelFuture f = b.bind(PORT).sync(); // (7)

f.channel().closeFuture().sync();

logger.info("TCP服务器已启动");

}

protected static void shutdown() {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

public static void main(String[] args) throws Exception {

logger.info("开始启动TCP服务器...");

TcpServer.run();

// TcpServer.shutdown();

}

}

TcpServerHandler

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class TcpServerHandler extends ChannelHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)

ctx.write(msg); // (1)

String obj= (String)msg;

System.out.println("访问数据"+obj);

// ctx.write(obj); // (1)

ctx.flush(); // (2)

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)

// Close theconnection when an exception is raised.

cause.printStackTrace();

ctx.close();

}

@Override

public void channelActive(final ChannelHandlerContext ctx) {

ctx.writeAndFlush("有客户端连接"); // (3)

}

}

TcpClientHander

[java] view
plain copy

print?

<span style="font-size:18px;">package com.bimatrix.revit.nettyTest;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class TcpClientHandler extends ChannelHandlerAdapter {

public TcpClientHandler() {

}

//private byte[] req;

/**

* 链路链接成功

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

/*for (int i = 0; i < 1000; i++) {

ctx.writeAndFlush("1ac");

} */

// 链接成功后发送

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

System.out.println("client收到数据" +msg);

// ctx.write("收到数据!");

// ctx.write(msg);

// ctx.write("w2d");

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

} </span>

TcpClient

[java] view
plain copy

print?

package com.bimatrix.revit.nettyTest;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.CharsetUtil;

import org.apache.log4j.Logger;

public class TcpClient {

private static final Logger logger = Logger.getLogger(TcpClient.class);

public static String HOST = "127.0.0.1";

public static int PORT = 9999;

public static Bootstrap bootstrap = getBootstrap();

public static Channel channel = getChannel(HOST,PORT);

/**

* 初始化Bootstrap

* @return

*/

public static final Bootstrap getBootstrap(){

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class);

b.handler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast("handler", new TcpClientHandler());

}

});

b.option(ChannelOption.SO_KEEPALIVE, true);

return b;

}

public static final Channel getChannel(String host,int port){

Channel channel = null;

try {

channel = bootstrap.connect(host, port).sync().channel();

} catch (Exception e) {

logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e);

return null;

}

return channel;

}

public static void sendMsg(String msg) throws Exception {

if(channel!=null){

channel.writeAndFlush(msg).sync();

}else{

logger.warn("消息发送失败,连接尚未建立!");

}

}

public static void main(String[] args) throws Exception {

try {

long t0 = System.nanoTime();

for (int i = 0; i < 100000; i++) {

if(i%3==0){

TcpClient.sendMsg(i+"11aaa222aaa");

}else if(i%3==1){

TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");

}else if(i%3==2){

TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");

}

}

long t1 = System.nanoTime();

System.out.println((t1-t0)/1000000.0);

} catch (Exception e) {

e.printStackTrace();

}

}

}

上面代码部分下面这一块 主要测试消息发送是否会造成粘包

for (int i = 0; i < 100000; i++) {

if(i%3==0){

TcpClient.sendMsg(i+"11aaa222aaa");

}else if(i%3==1){

TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");

}else if(i%3==2){

TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: