您的位置:首页 > 理论基础 > 计算机网络

使用netty进行客户端网络编程及实现断线重连功能

2018-02-25 18:42 1116 查看
最近使用netty搭建了一个服务端和一个客户端网络通信的demo,记录一下,不多说,先上项目结构图



当时maven有点问题,所以直接引入的jar包,反正也只有一个。(ClientHandler和ServerHandler类分别用HeartBeatClientHandler和HeartBeatServerHandler代替)
搭建服务端之前还有一些事情要做,对,就是自定义协议,还有编码解码
这部分是参考了网上的一些资料
A  首先是协议部分
    1 新建LuckHeader.java package luck;

public class LuckHeader {
// 协议版本
private int version;
// 消息内容长度
private int contentLength;
// 服务名称
private String sessionId;

public LuckHeader(int version, int contentLength, String sessionId) {
this.version = version;
this.contentLength = contentLength;
this.sessionId = sessionId;
}

public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}这个属于协议头部分,下面是消息内容部分
    2 新建LuckMessage.java
package luck;

public class LuckMessage {
private LuckHeader luckHeader;
private String content;

public LuckMessage(LuckHeader luckHeader, String content) {
this.luckHeader = luckHeader;
this.content = content;
}

public LuckHeader getLuckHeader() {
return luckHeader;
}

public void setLuckHeader(LuckHeader luckHeader) {
this.luckHeader = luckHeader;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

@Override
public String toString() {
return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]",
luckHeader.getVersion(),
luckHeader.getContentLength(),
luckHeader.getSessionId(),
content);
}
}到这里一个简单的luck协议的消息的模板就做好了.
B 接下来是编码解码部分
    1 新建LuckDecoder.javapackage encode;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import luck.LuckHeader;
import luck.LuckMessage;

public class LuckDecoder extends ByteToMessageDecoder{

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//获取协议版本
int version = in.readInt();
//获取消息长度
int contentLength = in.readInt();
//获取sessionid
byte[] sessionByte = new byte[36];
in.readBytes(sessionByte);
String sessionid = new String(sessionByte);

//组装协议头
LuckHeader hearder = new LuckHeader(version, contentLength, sessionid);

//读取消息内容
byte[] content = in.readBytes(in.readableBytes()).array();

LuckMessage message = new LuckMessage(hearder, new String(content));

out.add(message);
}

}
2 新建LuckEncoder.javapackage encode;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import luck.LuckHeader;
import luck.LuckMessage;

public class LuckEncoder extends MessageToByteEncoder<LuckMessage>{

@Override
protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf in) throws Exception {

//将message转换成二进制数据
LuckHeader header = message.getLuckHeader();

//写入的顺序就是协议的顺序
//标题头信息
in.writeInt(header.getVersion());
in.writeInt(message.getContent().length());
in.writeBytes(header.getSessionId().getBytes());

//主题信息
in.writeBytes(message.getContent().getBytes());
}

}注意一下 ,这2个类的编码解码的顺序要一致才可以,不然会报错
  好,准备工作做完了,接下来重头戏就是搭建服务端了
  新建Server.javapackage server;

import java.util.concurrent.TimeUnit;

import encode.LuckDecoder;
import encode.LuckEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

/**
* Netty通信的步骤:
①创建两个NIO线程组,一个专门用于网络事件处理(接受客户端的连接),另一个则进行网络通信的读写。
②创建一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传出数据的缓存大小等。
③创建一个用于实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式以及实际处理数据的接口。
④绑定端口,执行同步阻塞方法等待服务器端启动即可。
* @author Administrator
*
*/
public class Server {
private int port;

public Server(int port){
this.port = port;
}

public void run(){
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于处理服务器端接受客户端
EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行网络通信(读写)
try{
ServerBootstrap bootstrap = new ServerBootstrap();//辅助工具类,用于服务器通道的一系列配置
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { //配置具体的数据处理方式

                     @Override
             protected void initChannel(SocketChannel socketChannel) throws Exception {
             ChannelPipeline pipeline = socketChannel.pipeline();
         pipeline.addLast(new IdleStateHandler(5, 0, 0,TimeUnit.SECONDS));
         pipeline.addLast(new LuckEncoder());
         pipeline.addLast(new LuckDecoder());

         //处理事件的类
         //pipeline.addLast(new ServerHandler());
         pipeline.addLast(new HeartBeatServerHandler());
                 }
                 })
.option(ChannelOption.SO_BACKLOG, 128)//设置tcp缓冲区(// 保持连接数 )
.option(ChannelOption.SO_SNDBUF, 32*1024)//设置发送数据缓冲大小
.option(ChannelOption.SO_RCVBUF, 32*1024)//设置接受数据缓冲大小
.childOption(ChannelOption.SO_KEEPALIVE , true)//保持连接
.option(ChannelOption.TCP_NODELAY, true);//有数据立即发送
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

}catch(Exception e){
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}

public static void main(String[] args) {
new Server(8077).run();
}
}服务端有2个线程池,一个处理客户端的连接,一个处理io任务,ServerBootstrap对象用于启动NIO服务端的辅助启动类(目的是降低服务端的开发复杂度),IdleStateHandler类是服务端监听客户端心跳需要配置的handler,处理事件的类由HeartBeatServerHandler处理,然后由bootstrap绑定监听一个端口,监听客户端的连接
实现断线的类HeartBeatServerHandler.java,瞧好咯
package server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import luck.LuckMessage;

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<LuckMessage>{

//连接失败次数
private int connectTime = 0;

//定义最大未连接次数
private static final int MAX_UN_CON_TIME = 3;
@Override
protected void messageReceived(ChannelHandlerContext ctx, LuckMessage msg) throws Exception {
try{
LuckMessage lmsg = (LuckMessage) msg;
if (lmsg.getContent().equals("h")) {
//心跳消息
//吧连接失败次数清0
System.out.println("Server接收到心跳消息 ,失败次数清0:" + msg.toString());
connectTime = 0 ;
} else {
System.out.println("Server接收到普通消息 :" + msg.toString());
}
}catch(Exception e){
e.printStackTrace();
}finally {
ReferenceCountUtil.release(msg);
}
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt){
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
//读超时
System.out.println("服务端读超时=====连接失败次数" + connectTime);
if (connectTime >= MAX_UN_CON_TIME) {
System.out.println("服务端关闭channel");

ce6a
ctx.channel().close();
connectTime = 0;
} else {
connectTime ++;
}

}else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
System.out.println("服务端写超时");
} else if (event.state() == IdleState.ALL_IDLE) {
/*总超时*/
System.out.println("服务端全部超时");
}

}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//		 	cause.printStackTrace();
System.out.println("服务端发生错误:" + cause.getMessage());
ctx.channel().close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客户端连接");
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 关闭,等待重连
ctx.close();
System.out.println("===服务端===(客户端失效)");
}
}
messageReceived方法是接受到消息的时候调用的,这个里面我搞了一些骚操作,可以看可不看,主要是接受到的消息是否为心跳消息,是的话,把连接失败次数connectTime清0;其他方法 里面都有注释。
重点是userEventTriggered方法,IdleState定义了几个事件
读事件 READER_IDLE

写事件 WRITER_IDLE
全部事件 ALL_IDLE
这个很好理解  就是字面意思,这里用读事件举得例子,也就是服务端没有收到客户端的消息(读到客户端的消息)的时候就触发该事件,当超过三次没有收到客户端的消息的时候 就断开与改客户端的连接ctx.channel().close();一个连接相当于一个channel。

服务端配置好了  接下来配置客户端

新建Client.javapackage client;

import java.net.InetSocketAddress;
import java.util.Random;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import encode.LuckDecoder;
import encode.LuckEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import luck.LuckHeader;
import luck.LuckMessage;

public class Client {

private static Channel channel;

private static Bootstrap bootstrap;

private static ChannelFutureListener channelFutureListener = null;

public static void main(String[] args) throws Exception {
new Client();
}

public Client() throws Exception{
System.out.println("构造方法");
init();
sendData();
}

static Scanner in = new Scanner(System.in);
public static void sendData() throws Exception {
System.out.println("senddata");

//组装协议信息
int version = 1;
String sessionId = UUID.randomUUID().toString();
String content = "";
LuckHeader header = new LuckHeader(version, content.length(), sessionId);
LuckMessage message = null;

do {
content = in.nextLine();
message = new LuckMessage(header, content);
channel.writeAndFlush(message);
} while (!content.equals("q"));
}

public void init() throws InterruptedException{
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
bootstrap = new Bootstrap();

bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS));
pipeline.addLast("encoder",new LuckEncoder());
pipeline.addLast("decoder",new LuckDecoder());

// pipeline.addLast(new ClientHandler());
pipeline.addLast(new HeartBeatClientHanlder());
}
});
//设置tcp协议的属性
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_TIMEOUT, 5000);

connect();
}catch(Exception e){
e.printStackTrace();
}finally {
// workerGroup.shutdownGracefully();
}
}

/**
* 重新连接服务器
* @throws InterruptedException
*/
public static void connect(){

if (channel != null && channel.isActive()) {
return;
}

System.out.println("连接中");
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.connect("192.168.1.12", 8077).sync();
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
System.out.println("连接成功");
} else {
System.out.println("连接失败");
}
}
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
与服务端不同的是,客户端只有一个NioEventLoopGroup,其他配置都是大同小异,
说明一下 static Scanner in = new Scanner(System.in);这句话定义为类的静态变量是为了保证,如果在服务器切断了改客户端的连接,并且重连了以后,再进入到sendData方法中只有一个Scanner ;如果定义在sendData方法里面,重连的时候就要发送2次消息才能让服务端接收到,而且是作为一条消息发过去的  ,2条消息中间还会有乱码。重连几次就需要几条消息一起 才能发过去

ok,现在都写好了   运行试一下  
先启动服务端Server.java然后启动客户端Client.java





然后客户端不发送消息知道服务端断开连接



此时客户端的channelInactive方法会调用connect方法,重新连接


到这里断线重连的简单例子就写好了  欢迎各位大牛指点
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty
相关文章推荐