基于netty的网络聊天室(二)——心跳检测及断线重连
2016-05-21 23:59
375 查看
前面介绍了Netty服务端客户端基本通信框架的搭建过程。下面将介绍Netty如何进行心跳检测以及处理客户端的断线重连。
为了适应恶劣的网络环境,比如网络超时、闪断,客户端进程僵死,需要机制来保证双方的通信能正常工作或者自动恢复。对于服务端来说,当客户端由于某些原因导致无法与服务端通信的,服务端需要主动注销与客户端的连接,减少无效链接的资源消耗。对于客户端来说,当服务进程宕机后进行重启,客户端应该自动能发起重连操作。
(一)心跳监测客户端
采用心跳机制,来确保服务端能及时发现无效的客户端链接。这里有个问题,心跳机制的发起方应该由服务端还是客户端。假设服务端出现宕机,客户端唯一能做的就是保证能及时发现服务端重启后能进行重连。因此,可以只由服务端来发起心跳检测。一旦服务端发现客户端连接超时多次,则 立即关闭链路。
心跳检测具体的设计思路如下:
1.服务端定时查看客户端链路是否空闲,一旦持续时间T没有收到客户端的请求包,则主动发送Ping包给客户端,同时心跳超时次数加1。
2.客户端收到服务的Ping请求,则立即发送一个Pong应答包。
3.服务端每次收到客户端的数据包,则重置超时次数。若连续N次未收到心跳应答包,则关闭链接。
心跳检测示例代码如下:
1.服务端NettyChatServer类的ChannelPipeline增加空闲状态处理器(IdleStateHandler)。该类用于检测通信Channel的读写状态超时,以此来实现心跳检测。IdleStateHandler的构造函数有三个参数,依次为读超时秒数,写超时秒数,读写超时秒数。我们只需要用到第一个参数。
2.ChatServerHandler类必须覆写userEventTriggered()方法处理超时逻辑。当超时次数少于指定次数时,向客户端发送Ping包;当超时次数大于指定次数时,注销客户端链接。
心跳调试技巧:如果需要演示心跳超时,只需在客户端启动后在任意代码里加个断点,这样服务端就会检测到客户端读超时。
(二)客户端断线重连
当服务端宕机后,客户端需要定时检测服务端开启状态,重新连接。实现逻辑也比较简单,只要在NettyChatClient类断开链接的逻辑后加上重连逻辑即可(reConnectServer()方法)。每次重连检测不必过于频繁,可以让线程休眠一段时间。
调试技巧:启动服务端与客户端后,单方面关闭服务端,即可看见客户端定时重连了。需要保证客户端重连成功后,能够与服务端收发数据,同时客户端也不无须继续检测重连。
为了适应恶劣的网络环境,比如网络超时、闪断,客户端进程僵死,需要机制来保证双方的通信能正常工作或者自动恢复。对于服务端来说,当客户端由于某些原因导致无法与服务端通信的,服务端需要主动注销与客户端的连接,减少无效链接的资源消耗。对于客户端来说,当服务进程宕机后进行重启,客户端应该自动能发起重连操作。
(一)心跳监测客户端
采用心跳机制,来确保服务端能及时发现无效的客户端链接。这里有个问题,心跳机制的发起方应该由服务端还是客户端。假设服务端出现宕机,客户端唯一能做的就是保证能及时发现服务端重启后能进行重连。因此,可以只由服务端来发起心跳检测。一旦服务端发现客户端连接超时多次,则 立即关闭链路。
心跳检测具体的设计思路如下:
1.服务端定时查看客户端链路是否空闲,一旦持续时间T没有收到客户端的请求包,则主动发送Ping包给客户端,同时心跳超时次数加1。
2.客户端收到服务的Ping请求,则立即发送一个Pong应答包。
3.服务端每次收到客户端的数据包,则重置超时次数。若连续N次未收到心跳应答包,则关闭链接。
心跳检测示例代码如下:
1.服务端NettyChatServer类的ChannelPipeline增加空闲状态处理器(IdleStateHandler)。该类用于检测通信Channel的读写状态超时,以此来实现心跳检测。IdleStateHandler的构造函数有三个参数,依次为读超时秒数,写超时秒数,读写超时秒数。我们只需要用到第一个参数。
2.ChatServerHandler类必须覆写userEventTriggered()方法处理超时逻辑。当超时次数少于指定次数时,向客户端发送Ping包;当超时次数大于指定次数时,注销客户端链接。
package com.kingston.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.kingston.base.ServerManager; import com.kingston.net.Packet; import com.kingston.net.PacketManager; import com.kingston.net.PacketType; import com.kingston.service.login.ClientHeartBeat; import com.kingston.service.login.LoginManagerProxy; import com.kingston.service.login.ServerLogin; public class ChatServerHandler extends ChannelHandlerAdapter{ //客户端超时次数 private Map<ChannelHandlerContext,Integer> clientOvertimeMap = new ConcurrentHashMap<>(); private final int MAX_OVERTIME = 3; //超时次数超过该值则注销连接 @Override public void channelRead(ChannelHandlerContext context,Object msg) throws Exception{ Packet packet = (Packet)msg; if(packet.getPacketType() == PacketType.ServerLogin ){ ServerLogin loginPact = (ServerLogin)packet; LoginManagerProxy.getManager().validateLogin(context,loginPact.getUserId(), loginPact.getUserPwd()); return ; }else{ if(validateSession(packet)){ PacketManager.execPacket(packet); } } clientOvertimeMap.remove(context);//只要接受到数据包,则清空超时次数 } private boolean validateSession(Packet loginPact){ return true; } @Override public void close(ChannelHandlerContext ctx,ChannelPromise promise){ System.err.println("TCP closed..."); ctx.close(promise); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("客户端关闭1"); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); System.err.println("客户端关闭2"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("业务逻辑出错"); cause.printStackTrace(); // ctx.fireExceptionCaught(cause); Channel channel = ctx.channel(); if(cause instanceof IOException && channel.isActive()){ System.err.println("simpleclient"+channel.remoteAddress()+"异常"); ctx.close(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //心跳包检测读超时 if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { System.err.println("客户端读超时"); int overtimeTimes = clientOvertimeMap.getOrDefault(ctx, 0); if(overtimeTimes < MAX_OVERTIME){ ServerManager.sendPacketTo(new ClientHeartBeat(), ctx); addUserOvertime(ctx); }else{ ServerManager.ungisterUserContext(ctx); } } } } private void addUserOvertime(ChannelHandlerContext ctx){ int oldTimes = 0; if(clientOvertimeMap.containsKey(ctx)){ oldTimes = clientOvertimeMap.get(ctx); } clientOvertimeMap.put(ctx, (int)(oldTimes+1)); } }3.增加下发包ClientHeartBeat类定义。客户端在收到该包的时候,需要向服务端发送一个应答包。
package com.kingston.service.login; import io.netty.buffer.ByteBuf; import com.kingston.base.ServerManager; import com.kingston.net.Packet; import com.kingston.net.PacketType; public class ClientHeartBeat extends Packet{ @Override public void writePacketMsg(ByteBuf buf) { // TODO Auto-generated method stub } @Override public void readFromBuff(ByteBuf buf) { // TODO Auto-generated method stub } @Override public PacketType getPacketType() { return PacketType.ClientHeartBeat; } @Override public void execPacket() { System.err.println("收到服务端的ping请求后,回复一个pong响应"); ServerManager.sendServerRequest(new ServerHeartBeat()); } }4.服务端在收到应答包后,重置超时次数为0
心跳调试技巧:如果需要演示心跳超时,只需在客户端启动后在任意代码里加个断点,这样服务端就会检测到客户端读超时。
(二)客户端断线重连
当服务端宕机后,客户端需要定时检测服务端开启状态,重新连接。实现逻辑也比较简单,只要在NettyChatClient类断开链接的逻辑后加上重连逻辑即可(reConnectServer()方法)。每次重连检测不必过于频繁,可以让线程休眠一段时间。
package com.kingston.netty; import java.net.InetSocketAddress; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldPrepender; import com.kingston.net.codec.PacketDecoder; import com.kingston.net.codec.PacketEncoder; public class NettyChatClient { public void connect(String host,int port) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new PacketDecoder(1024*1, 0,2,0,2)); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new PacketEncoder()); // pipeline.addLast(new HeartBeatReqHandler()); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture f = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyContants.LOCAL_SERVER_IP, NettyContants.LOCAL_SERVER_PORT)) .sync(); f.channel().closeFuture().sync(); }catch(Exception e){ e.printStackTrace(); }finally{ // group.shutdownGracefully(); //这里不再是优雅关闭了 reConnectServer(); } } /** * 断线重连 */ private void reConnectServer(){ try { Thread.sleep(5000); System.err.println("客户端进行断线重连"); connect(NettyContants.REMOTE_SERVER_IP, NettyContants.REMOTE_SERVER_PORT); } catch (Exception e) { e.printStackTrace(); } } }
调试技巧:启动服务端与客户端后,单方面关闭服务端,即可看见客户端定时重连了。需要保证客户端重连成功后,能够与服务端收发数据,同时客户端也不无须继续检测重连。
相关文章推荐
- 游戏网络测试基础
- java 网络编程(Socket) TCP/UDP 总结案例
- https应用安装详解
- 计算机病毒实践汇总五:搭建虚拟网络环境
- nginx https ssl 设置受信任证书[原创]
- 计算机网络
- 套接字编程——基于TCP协议
- 《图解TCP/IP》第一章 网络基础知识
- C#网络程序设计1-5:方法回调、委托、线程的综合运用
- C#网络程序设计1-4:多线程并发执行
- C#网络程序设计1-3:线程支持
- Linux常用命令--网络命令、关机重启命令
- C#网络程序设计1-2:委托实现单线程
- TCP与UDP的区别与选择
- C#网络程序设计1-1:委托
- 4GLTE网络语音三大解决方案
- HttpClient使用详解
- 4G网络架构
- Google深度学习笔记 从线性分类器到深度神经网络
- 3G网络结构