您的位置:首页 > 其它

netty短链接的几篇文章

2016-06-23 23:16 369 查看


netty4发送HttpRequest及HttpReponse太大的问题(半包)

netty是对socket的封装,底下走的TCP还是老一套,太大的HTTP包发送的时候是要被切分成好几段的,发送之后接收就变得很麻烦。网上有人提出多种解决方式,我自己试了试,结果如下:

1.对于HttpRequest,有人提出在头部协商消息体长度。不要轻易相信呐,骚年们,这玩意儿就是一个坑,写上之后那个不睁眼的TCP照样剁碎了给发出去!此处红色警戒!

2.不用TCP剁碎了,自己剁碎了传过去。此方是否对症,没试验。

3.定长消息。自我感觉不适合HttpRequest及HttpReponse这类东西,所以没试。

4.其他,如设置发送缓冲和接收缓冲等,均不管用

所以,从几个坑里面爬出来之后,我隆重推出netty的解决方案:用HttpObjectAggregator!!!

首先,在客户端和服务端的channel下的pipeline启动之前加上

<span style="white-space:pre">	</span>ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1024*1024*64));
ch.pipeline().addLast("chunkedWriter", new ChunkedWriteHandler());
接收的时候就这样:

/**
* 读取传过来的消息,read request message from channel
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof DefaultHttpRequest) {
request = (DefaultHttpRequest) msg;
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
messageOfClient = buf.toString(io.netty.util.CharsetUtil.UTF_8);
if(messageOfClient!=null&&!messageOfClient.contentEquals("")&&!messageOfClient.contains("ping")){
messageToSend+= messageOfClient;
}else{
System.out.println("channel的数目:"+ServerDataSyncServer.channels.size()+" channelGroupSize");
System.out.println(ctx.channel().remoteAddress().toString());
}
buf.release();
if(content instanceof LastHttpContent){
//此处对其他客户端的心跳包不予处理,因为服务端掉线之后会客户端会循环侦测连接,客户端断掉之后将服务端将不打印输出信息
if(messageToSend.length()>12&&messageToSend.substring(0, 2).contentEquals("DB")){

//消息长度符合一定条件,则是需要向其他客户端发送的数据库消息,调用方法转发
System.out.println("Server已读取数据库持久化信息,将开始向所有客户端发送");
ServerDataSyncServer.channels.remove(ctx.channel());
System.out.println("messageToSend   "+messageToSend );
String messageContent = messageToSend;
ServerDataSyncServerSendGroup.sendToAllChannel(messageContent);
messageToSend = "";
}
}
}
}

当然,我的发送客户端是短连接,
ServerDataSyncServer.channels.remove(ctx.channel());
长连接的对这句可以直接忽略。

这样就可以了,netty接收到分段的大TCP包之后会自动组装成完整的HTTPrequest或者HttpResponse,然后逐个读取分段的消息体,最后一个是LastHttpContent类型,读到这个就算是整个包读完了,再做后续处理即可。

完毕

---------------------------


Netty4中TCP短连接处理无法正确发送数据的问题探究

最近在使用Netty开发一个服务器,在一个短连接的业务场景中出现了

ChannelHandlerContext.flush()

ChannelHandlerContext.close()

后,数据没有正确发送出来的情况。

 

通过Wireshark抓本地回环包确认了有一小部分数据没有成功发送出来。

检查业务代码无误后发现在Netty配置中 有一个配置项

ChannelOption.SO_SNDBUF的值设置为了128

我发送数据的长度为144b

在客户端刚好也只能收到128字节 其余的16字节没有正确收到

 

尝试修改该配置为1024可解决问题。 但并不是根本的解决。

 

通过阅读Netty源码后发现,netty会异步的write剩余的数据,最后的解决方案为:

修改代码添加异步回调处理机制:

ChannelFuture channelFuture = ctx.writeAndFlush(responsePacket);

channelFuture.addListener(new ChannelFutureListener() {

/*

* @see io.netty.util.concurrent.GenericFutureListener#

* operationComplete(io.netty.util.concurrent.Future)

*/

@Override

publicvoid operationComplete(ChannelFuture future) throws Exception {

if (future.isSuccess()) {

ctx.close();

} else {

throw new Exception(“未成功发送数据给客户端”, future.cause());

}

}

});

 

经完整测试解决了问题
---------------------------------------------------

future.addListener(ChannelFutureListener.CLOSE)这句话的意思是,当请求处理完成之后,关闭链接。





基于netty4的TCP短连接测试 

本文中的代码做了一定优化,但是还不是很完全,欢迎指正

工程结构图如下:



 

TcpServer.java

package com.lin.netty4.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import org.apache.log4j.Logger;

public class TcpServer {
private static final Logger logger = Logger.getLogger(TcpServer.class);
private static final String IP = "127.0.0.1";
private static final int PORT = 9999;
/**用于分配处理业务线程的线程组个数 */
protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认
/** 业务出现线程大小*/
protected static final int BIZTHREADSIZE = 1000;
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
protected static void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast(workerGroup,new TcpServerHandler());
}
});
// b.childOption(ChannelOption.SO_KEEPALIVE,true);
// b.option(ChannelOption.SO_BACKLOG, 10000);
b.bind(IP, PORT).sync();
logger.info("TCP服务器已启动");
}

protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

public static void main(String[] args) throws Exception {
logger.info("开始启动TCP服务器...");
TcpServer.run();
// TcpServer.shutdown();
}
}

TcpServerHandler.java

package com.lin.netty4.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import org.apache.log4j.Logger;

public class TcpServerHandler extends ChannelInboundHandlerAdapter{
private static final Logger logger = Logger.getLogger(TcpServerHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
byte[] dst = new byte[buf.capacity()];
buf.readBytes(dst);
logger.info("SERVER接收到消息:" + new String(dst));

byte[] dest = (new String(dst)+". yes, server is accepted you ,nice !").getBytes();
ByteBuf destBuf = ctx.alloc().buffer(dest.length);
destBuf.writeBytes(dest);
ctx.channel().writeAndFlush(destBuf).addListener(ChannelFutureListener.CLOSE);

ReferenceCountUtil.release(msg);
} else {
logger.warn("error object !");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
logger.warn("Unexpected exception from downstream.", cause);
ctx.close();
}
}

TcpClient.java

package com.lin.netty4.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import org.apache.log4j.Logger;

public class TcpClient {
private static final Logger logger = Logger.getLogger(TcpClient.class);
public static String HOST = "127.0.0.1";
public static int PORT = 9999;

public static Bootstrap bootstrap = getBootstrap();
/**
* 初始化Bootstrap
* @return
*/
public static final Bootstrap getBootstrap(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("handler", new TcpClientHandler());
}
});
// b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}

public static final Channel getChannel(String host,int port){
Channel channel = null;
try {
channel = bootstrap.connect(host, port).sync().channel();
} catch (Exception e) {
logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e);
return null;
}
return channel;
}

public static void sendMsg(Channel channel,Object msg) throws Exception {
if(channel!=null){
channel.writeAndFlush(msg).sync();
}else{
logger.warn("消息发送失败,连接尚未建立!");
}
}

public static void main(String[] args) throws Exception {
try {
long t0 = System.nanoTime();
byte[] value = null;
Channel channel = null;
for (int i = 0; i < 50000; i++) {
channel = getChannel(HOST, PORT);
value = (i+",你好").getBytes();
ByteBufAllocator alloc = channel.alloc();
ByteBuf buf = alloc.buffer(value.length);
buf.writeBytes(value);
TcpClient.sendMsg(channel,buf);
}
long t1 = System.nanoTime();
System.out.println((t1-t0)/1000000.0);
Thread.sleep(5000);
System.exit(0);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

TcpClientHandler.java

package com.lin.netty4.tcp;

import org.apache.log4j.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class TcpClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(TcpClientHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if(msg instanceof ByteBuf){
ByteBuf buf = (ByteBuf)msg;
byte[] dst = new byte[buf.capacity()];
buf.readBytes(dst);
logger.info("client接收到服务器返回的消息:"+new String(dst));
ReferenceCountUtil.release(msg);
}else{
logger.warn("error object");
}

}

}

log4j.properties

#控制台
log4j.rootLogger=DEBUG, CONSOLE,DAILY_ROLLING_FILE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=WARN
log4j.appender.CONSOLE.Encoding=UTF-8
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%t] %d [%p] - %l:%m%n

#文件方式存储
log4j.appender.DAILY_ROLLING_FILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DAILY_ROLLING_FILE.Encoding=UTF-8
log4j.appender.DAILY_ROLLING_FILE.Threshold=WARN
log4j.appender.DAILY_ROLLING_FILE.File=E\:\\log4j\\nettyDemo.log
log4j.appender.DAILY_ROLLING_FILE.DatePattern='.'yyyy-MM-dd
log4j.appender.DAILY_ROLLING_FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY_ROLLING_FILE.layout.ConversionPattern=[%t]%d [%p] - %l:%m%n

log4j.logger.org.apache.commons = WARN
log4j.logger.io.netty = WARN

当前情况下测试发送5W数据需要约37秒(37870.31592),Eclipse未做优化,PC配置4核3G内存

关注微信公众号:

留言暗号给他: 我是程序员  即可赠送一套mysql dba视频 记得不要取消关注 因为不定期送视频 

他们有1700G it视频  (识别二维码 或者搜索公众号 历史帝 或者zhenshidelishi)







赠送视频如下:









或者一套redis视频







不定期送视频 

1700G it视频  等你来!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: