Netty实现消息推送以及内部心跳机制
2015-10-26 09:49
666 查看
准备说明:引入java-websocket,netty-all-5.0等的jar包。其中内部心跳机制使用userEventTriggered事件方式实现,客户端的心跳实现客户端的断点重连工作,服务端的心跳
实现服务端清除多余链接的功能。
以下为一些实现的代码:
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12. netty客户端启动类
13. netty客户端操作实现类
14.
15. netty服务端启动类
16. netty服务端操作实现类
实现服务端清除多余链接的功能。
以下为一些实现的代码:
1.
package base; /** * * 请求类型的消息 */ public class AskMsg extends BaseMsg { public AskMsg() { super(); setType(MsgType.ASK); } private AskParams params; public AskParams getParams() { return params; } public void setParams(AskParams params) { this.params = params; } }
2.
package base; import java.io.Serializable; /** * */ public class AskParams implements Serializable { private static final long serialVersionUID = 1L; private String auth; private String content; public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getAuth() { return auth; } public void setAuth(String auth) { this.auth = auth; } }
3.
package base; import java.io.Serializable; /** * * 必须实现序列,serialVersionUID 一定要有 */ public abstract class BaseMsg implements Serializable { private static final long serialVersionUID = 1L; private MsgType type; //必须唯一,否者会出现channel调用混乱 private String clientId; //初始化客户端id public BaseMsg() { this.clientId = Constants.getClientId(); } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MsgType getType() { return type; } public void setType(MsgType type) { this.type = type; } }
4.
package base; /** * */ public class Constants { private static String clientId; public static String getClientId() { return clientId; } public static void setClientId(String clientId) { Constants.clientId = clientId; } }
5.
package base; /** * * 登录验证类型的消息 */ public class LoginMsg extends BaseMsg { private String userName; private String password; public LoginMsg() { super(); setType(MsgType.LOGIN); } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
6.
package base; /** * */ public enum MsgType { PING,ASK,REPLY,LOGIN }
7.
package base; /** * * 心跳检测的消息类型 */ public class PingMsg extends BaseMsg { public PingMsg() { super(); setType(MsgType.PING); } }
8.
package base; import java.io.Serializable; /** * */ public class ReplyBody implements Serializable { private static final long serialVersionUID = 1L; }
9.
package base; /** * */ public class ReplyClientBody extends ReplyBody { private String clientInfo; public ReplyClientBody(String clientInfo) { this.clientInfo = clientInfo; } public String getClientInfo() { return clientInfo; } public void setClientInfo(String clientInfo) { this.clientInfo = clientInfo; } }
10.
package base; /** * */ public class ReplyMsg extends BaseMsg { public ReplyMsg() { super(); setType(MsgType.REPLY); } private ReplyBody body; public ReplyBody getBody() { return body; } public void setBody(ReplyBody body) { this.body = body; } }
11.
package base; /** * */ public class ReplyServerBody extends ReplyBody { private String serverInfo; public ReplyServerBody(String serverInfo) { this.serverInfo = serverInfo; } public String getServerInfo() { return serverInfo; } public void setServerInfo(String serverInfo) { this.serverInfo = serverInfo; } }
12. netty客户端启动类
package client; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.TimeUnit; import base.AskMsg; import base.AskParams; import base.Constants; import base.LoginMsg; /** * */ public class NettyClientBootstrap { private int port; private String host; private SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20); public NettyClientBootstrap(int port, String host) throws InterruptedException { this.port = port; this.host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0)); socketChannel.pipeline().addLast(new ObjectEncoder()); socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture future =bootstrap.connect(host,port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel)future.channel(); System.out.println("connect server 成功---------"); } } public static void main(String[]args) throws InterruptedException, IOException { Constants.setClientId("002"); NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost"); LoginMsg loginMsg=new LoginMsg(); loginMsg.setPassword("yao"); loginMsg.setUserName("robin"); bootstrap.socketChannel.writeAndFlush(loginMsg); // while (true){ // TimeUnit.SECONDS.sleep(3); // AskMsg askMsg=new AskMsg(); // AskParams askParams=new AskParams(); // askParams.setAuth("authToken"); // askMsg.setParams(askParams); // bootstrap.socketChannel.writeAndFlush(askMsg); // } BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String msg = console.readLine(); if (msg == null) { break; } else if ("bye".equals(msg.toLowerCase())) { break; } else if ("ping".equals(msg.toLowerCase())) { } else { AskMsg askMsg=new AskMsg(); AskParams askParams=new AskParams(); askParams.setAuth("authToken"); askParams.setContent(msg); askMsg.setParams(askParams); bootstrap.socketChannel.writeAndFlush(askMsg); } } } }
13. netty客户端操作实现类
package client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import java.util.Date; import server.NettyChannelMap; import base.AskMsg; import base.BaseMsg; import base.LoginMsg; import base.MsgType; import base.PingMsg; import base.ReplyClientBody; import base.ReplyMsg; import base.ReplyServerBody; /** * */ public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> { private int UNCONNECT_NUM = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { if(UNCONNECT_NUM >= 4) { System.err.println("connect status is disconnect."); ctx.close(); //此处当重启次数达到4次之后,关闭此链接后,并重新请求进行一次登录请求 return; } IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: System.out.println("send ping to server---date=" + new Date()); PingMsg pingMsg=new PingMsg(); ctx.writeAndFlush(pingMsg); UNCONNECT_NUM++; System.err.println("writer_idle over. and UNCONNECT_NUM=" + UNCONNECT_NUM); break; case READER_IDLE: System.err.println("reader_idle over."); UNCONNECT_NUM++; //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道 case ALL_IDLE: System.err.println("all_idle over."); UNCONNECT_NUM++; //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道 default: break; } } } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { MsgType msgType=baseMsg.getType(); switch (msgType){ case LOGIN:{ //向服务器发起登录 LoginMsg loginMsg=new LoginMsg(); loginMsg.setPassword("alan"); loginMsg.setUserName("lin"); channelHandlerContext.writeAndFlush(loginMsg); }break; case PING:{ System.out.println("receive server ping ---date=" + new Date()); ReplyMsg replyPing=new ReplyMsg(); ReplyClientBody body = new ReplyClientBody("send client msg."); replyPing.setBody(body); channelHandlerContext.writeAndFlush(replyPing); }break; case ASK:{ AskMsg askMsg=(AskMsg)baseMsg; ReplyClientBody replyClientBody=new ReplyClientBody("receive server askmsg:" + askMsg.getParams().getContent()); ReplyMsg replyMsg=new ReplyMsg(); replyMsg.setBody(replyClientBody); channelHandlerContext.writeAndFlush(replyMsg); }break; case REPLY:{ ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody(); UNCONNECT_NUM = 0; System.out.println("UNCONNECT_NUM="+ UNCONNECT_NUM + ",receive server replymsg: "+replyServerBody.getServerInfo()); } default:break; } ReferenceCountUtil.release(msgType); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("in client exceptionCaught."); super.exceptionCaught(ctx, cause); //出现异常时,可以发送或者记录相关日志信息,之后,直接断开该链接,并重新登录请求,建立通道 } }
14.
package server; import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * */ public class NettyChannelMap { private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>(); public static void add(String clientId,SocketChannel socketChannel){ map.put(clientId,socketChannel); } public static Channel get(String clientId){ return map.get(clientId); } public static void remove(SocketChannel socketChannel){ for (Map.Entry entry:map.entrySet()){ if (entry.getValue()==socketChannel){ map.remove(entry.getKey()); } } } }
15. netty服务端启动类
package server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; import base.AskMsg; /** * */ public class NettyServerBootstrap { private int port; private SocketChannel socketChannel; public NettyServerBootstrap(int port) throws InterruptedException { this.port = port; bind(); } private void bind() throws InterruptedException { EventLoopGroup boss=new NioEventLoopGroup(); EventLoopGroup worker=new NioEventLoopGroup(); ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 128); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(10,5,0)); p.addLast(new ObjectEncoder()); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast(new NettyServerHandler()); } }); ChannelFuture f= bootstrap.bind(port).sync(); if(f.isSuccess()){ System.out.println("server start---------------"); } } public static void main(String []args) throws InterruptedException { NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999); while (true){ // SocketChannel channel=(SocketChannel)NettyChannelMap.get("001"); // if(channel!=null){ // AskMsg askMsg=new AskMsg(); // channel.writeAndFlush(askMsg); // } TimeUnit.SECONDS.sleep(10); } } }
16. netty服务端操作实现类
package server; import java.util.Date; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import base.AskMsg; import base.BaseMsg; import base.LoginMsg; import base.MsgType; import base.PingMsg; import base.ReplyBody; import base.ReplyClientBody; import base.ReplyMsg; import base.ReplyServerBody; /** * */ public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> { private int UNCONNECT_NUM_S = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { if(UNCONNECT_NUM_S >= 4) { System.err.println("client connect status is disconnect."); ctx.close(); //此处当重启次数达到4次之后,关闭此链接后,清除服务端相关的记录信息 return; } IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: System.out.println("send ping to client---date=" + new Date()); PingMsg pingMsg=new PingMsg(); ctx.writeAndFlush(pingMsg); UNCONNECT_NUM_S++; System.err.println("writer_idle over. and UNCONNECT_NUM_S=" + UNCONNECT_NUM_S); break; case READER_IDLE: System.err.println("reader_idle over."); UNCONNECT_NUM_S++; //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道 case ALL_IDLE: System.err.println("all_idle over."); UNCONNECT_NUM_S++; //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道 default: break; } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("in channelInactive."); NettyChannelMap.remove((SocketChannel)ctx.channel()); } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { if(MsgType.LOGIN.equals(baseMsg.getType())){ LoginMsg loginMsg=(LoginMsg)baseMsg; if("lin".equals(loginMsg.getUserName())&&"alan".equals(loginMsg.getPassword())){ //登录成功,把channel存到服务端的map中 NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel()); System.out.println("client"+loginMsg.getClientId()+" 登录成功"); } }else{ if(NettyChannelMap.get(baseMsg.getClientId())==null){ //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 LoginMsg loginMsg=new LoginMsg(); channelHandlerContext.channel().writeAndFlush(loginMsg); } } switch (baseMsg.getType()){ case PING:{ PingMsg pingMsg=(PingMsg)baseMsg; ReplyMsg replyPing=new ReplyMsg(); ReplyServerBody body = new ReplyServerBody("send server msg."); replyPing.setBody(body); NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing); System.err.println("ping over."); }break; case ASK:{ //收到客户端的请求 AskMsg askMsg=(AskMsg)baseMsg; if("authToken".equals(askMsg.getParams().getAuth())){ ReplyServerBody replyBody=new ReplyServerBody("receive client askmsg:" + askMsg.getParams().getContent()); ReplyMsg replyMsg=new ReplyMsg(); replyMsg.setBody(replyBody); NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg); } }break; case REPLY:{ //收到客户端回复 ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody(); UNCONNECT_NUM_S = 0; System.out.println("UNCONNECT_NUM_S=" + UNCONNECT_NUM_S +",receive client replymsg: "+clientBody.getClientInfo()); }break; default:break; } ReferenceCountUtil.release(baseMsg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("in here has an error."); NettyChannelMap.remove((SocketChannel)ctx.channel()); super.exceptionCaught(ctx, cause); System.err.println("channel is exception over. (SocketChannel)ctx.channel()=" + (SocketChannel)ctx.channel()); } }
相关文章推荐
- FairScheduler源码资源抢占
- Java transient关键字
- MyEclipse 开机时联网卡机的解决办法
- MYSQL基础: INT、DATETIME、TIMESTAMP如何选择?
- 九度OJ 1168:字符串的查找删除 (查找)
- 海量数据的插入时间对比
- Codeforces Round #327 (Div. 2) B. Rebranding
- jquery&easyui
- Tomcat报45秒无法启动错误修改方法
- 九度OJ 1168:字符串的查找删除 (查找)
- iOS开发 -GameKit蓝牙开发
- JAVA之代码混淆proguard基础(三)从异常堆栈中还原 ProGuard 混淆过的代码
- PHP操作MySQL数据库
- 九度OJ 1167:数组排序 (排序)
- PHP字符串操作之trim/rtrim/ltrim
- 设计模式-工厂模式
- ios 网络字节顺序的转换HTOS
- 九度OJ 1167:数组排序 (排序)
- 苏州Oracle社招面经
- eclipse 异常 Subversion Native Library Not Available解决方案