您的位置:首页 > 其它

Netty框架的简单使用,实现socket通讯

2018-02-12 11:15 597 查看
个人博客:haichenyi.com。感谢关注

  题外话,很多人都把JDK1.4提供的NIO称之为异步非阻塞I/O;其实,并不然,从严格意义上面讲,它只能称为非阻塞I/O。在JDK1.7提供的NIO 2.0,新增了异步的套接字通道Channel,它才是真正的异步非阻塞I/O。下表是不同I/O模型的对比:

表1-1 几种I/O模型和同能对比

同步阻塞I/O(BIO)伪异步I/O非阻塞I/O(NIO)异步I/O(AIO)
客户端个数:I/O线程1:1M:N(M可以大于N)M:1(1个I/O线程处理多个客户端连接)
I/O类型(阻塞)阻塞I/O阻塞I/O非阻塞I/O
I/O类型(同步)同步I/O同步I/O同步I/O(I/O多路复用)
API使用难度简单简单非常复杂
调试难度简单简单复杂
可靠性非常差
吞吐量

简介

  Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

  也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

  “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。——百度百科

优点

API使用简单,开发门槛低

功能强大,预置了多种编解码功能,支持多种主流协议

性能高,通过与业界其他主流NIO框架相比,netty的综合性能最高

成熟,稳定,Netty已经修复了已经发现的所有的JDK NIO BUG,业务开发人员不用再为NIO的bug而烦恼

社区活跃,版本迭代周期短,发现bug可以及时被修复,同时有更多的新功能加入

经历了大规模的商界业务考验,只能得到了验证

粘包、拆包

概念

  TCP是一个流协议,所谓的流,就是没有界限的一串数据。可以考虑河里的流水,他们并没有界限。tcp底层并不了解业务层数据的具体含义,他会根据tcp缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被tcp拆分成多个包进行发送,也可能把多个小包封装成一个大数据一起发送,这就是所谓的tcp粘包,拆包问题

产生原因

应用程序write写入的字节大小大于套接口发送缓冲区的大小

进行MSS大小的tcp分段

以太网帧的payload大于MTU进行IP分片

解决办法

消息定长,每个报文大小固定长度,不够的补0

包尾增加回车换行符进行分割。例如:FTP协议

将消息分为消息头和消息体。消息头中包含消息的总长度字段

更复杂的应用层协议

Netty框架的解决办法

  LineBasedFrameDecoder和StringDecoder两个类

LineBasedFrameDecoder

  LineBasedFrameDecoder的工作原理是依次遍历ByteBuf中的可读字节,判断看是否有”\r”或者”\r\n”,如果有就以此为结束位置,从可读索引位置到结束区间的字节就组成了一行。他是以换行符为结束标志的解码器,支持携带结束符和不带结束符两种解码方式。同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读取的异常流

StringDecoder

  StringDecoder的功能就非常简单,就是将接收到的对象换成字符串,然后继续调用后面的handler,LineBasedFrameDecoder+StringDecoder组合就是按换行符切换文本解码器,他被设计用来支持TCP粘包和拆包。Netty支持其他其他符号的解码器(DelimiterBasedFrameDecode)

  说了这么多,代码来了,就是用Netty实现的心跳。对于懒癌晚期,已经风装好,可以直接拿过去用,注释也写的很清楚

import android.util.Log;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
* @author 海晨忆
* @date 2018/2/6
* @desc
*/
public class SocketTcp {
private static SocketTcp socketTcp = new SocketTcp();
private Channel channel = null;
private EventLoopGroup group;
private int port;
private String host;

public static SocketTcp getInstance() {
return socketTcp;
}

public SocketTcp setPort(int port) {
this.port = port;
return this;
}

public SocketTcp setHost(String host) {
this.host = host;
return this;
}

public void connect() {
if (channel != null) return;
if (group == null) {
//NIO线程组
group = new NioEventLoopGroup();
}
try {//配置Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//以换行符为结束标记
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MyHeartSocket());

//以"$_"作为分隔符
/*ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
String s = "$_";
ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MyHeartSocket());*/

}
});
//发起异步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channel = channelFuture.channel();
//等待服务端监听端口关闭
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
disConnect();
}
}

/**
* 断开tcp连接.
*/
private void disConnect() {
if (null != group) {
group.shutdownGracefully();
}
group = null;
channel = null;
}

public void sendMessage(String msg) {//连接成功后,通过Channel提供的接口进行IO操作
try {
if (channel != null && channel.isOpen()) {
channel.writeAndFlush(msg).sync();
Log.d("wz", "send succeed " + msg);
} else {
reConnect();
throw new Exception("channel is null | closed");
}
} catch (Exception e) {
reConnect();
e.printStackTrace();
}
}

/**
* 重连.
*/
private void reConnect() {
new Thread(this::connect);
}
}


package com.example.zwang.myapplication.socket;

import android.os.SystemClock;
import android.util.Log;

import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyHeartSocket extends SimpleChannelInboundHandler<Object> {
private ChannelHandlerContext ctx;
private boolean isConnect = false;

@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
Log.v("WZ", "连接正常messageReceived");
ByteBuf msg1 = (ByteBuf) msg;
byte[] bytes = new byte[msg1.readableBytes()];
msg1.readBytes(bytes);
String s = new String(bytes, "UTF-8");
Log.v("WZ", "接收到的消息:" + s);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Log.v("WZ", "连接正常channelActive");
isConnect = true;
if (this.ctx == null) {
synchronized (MyHeartSocket.class) {
if (this.ctx == null) {
this.ctx = ctx;
MyAppHeart();
}
}
}
}

private void MyAppHeart() {
new Thread(() -> {
while (ctx != null && isConnect) {
String data = "123";
byte[] bytes = data.getBytes();
if (isConnect) {
ctx.writeAndFlush(Unpooled.buffer(bytes.length).writeBytes(bytes));
SystemClock.sleep(3000);
}
}
}).start();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
EventLoop loop = ctx.channel().eventLoop();
loop.schedule(() -> SocketTcp.getInstance().connect(), 5, TimeUnit.SECONDS);
super.channelInactive(ctx);
Log.v("WZ", "重新连接socket服务器");
isConnect = false;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
Log.v("WZ", "发送数据包");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Log.v("WZ", "连接出现异常");
this.ctx.close();
this.ctx = null;
}
}


结束
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty 心跳机制 Socket
相关文章推荐