您的位置:首页 > 其它

Netty实现消息推送以及内部心跳机制

2015-10-26 09:49 666 查看
准备说明:引入java-websocket,netty-all-5.0等的jar包。其中内部心跳机制使用userEventTriggered事件方式实现,客户端的心跳实现客户端的断点重连工作,服务端的心跳

实现服务端清除多余链接的功能。

以下为一些实现的代码:

1.
package base;

/**
*
* 请求类型的消息
*/
public class AskMsg extends BaseMsg {
public AskMsg() {
super();
setType(MsgType.ASK);
}
private AskParams params;

public AskParams getParams() {
return params;
}

public void setParams(AskParams params) {
this.params = params;
}
}


2.
package base;

import java.io.Serializable;

/**
*
*/
public class AskParams implements Serializable {
private static final long serialVersionUID = 1L;
private String auth;
private String content;

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public String getAuth() {
return auth;
}

public void setAuth(String auth) {
this.auth = auth;
}
}


3.
package base;

import java.io.Serializable;

/**
*
* 必须实现序列,serialVersionUID 一定要有
*/

public abstract class BaseMsg  implements Serializable {
private static final long serialVersionUID = 1L;
private MsgType type;
//必须唯一,否者会出现channel调用混乱
private String clientId;

//初始化客户端id
public BaseMsg() {
this.clientId = Constants.getClientId();
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public MsgType getType() {
return type;
}

public void setType(MsgType type) {
this.type = type;
}
}


4.
package base;

/**
*
*/
public class Constants {
private static String clientId;

public static String getClientId() {
return clientId;
}

public static void setClientId(String clientId) {
Constants.clientId = clientId;
}
}


5.
package base;

/**
*
* 登录验证类型的消息
*/
public class LoginMsg extends BaseMsg {
private String userName;
private String password;
public LoginMsg() {
super();
setType(MsgType.LOGIN);
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}
}


6.
package base;

/**
*
*/
public enum  MsgType {
PING,ASK,REPLY,LOGIN
}


7.
package base;

/**
*
* 心跳检测的消息类型
*/
public class PingMsg extends BaseMsg {
public PingMsg() {
super();
setType(MsgType.PING);
}
}


8.
package base;

import java.io.Serializable;

/**
*
*/
public class ReplyBody implements Serializable {
private static final long serialVersionUID = 1L;
}


9.
package base;

/**
*
*/
public class ReplyClientBody extends ReplyBody {
private String clientInfo;

public ReplyClientBody(String clientInfo) {
this.clientInfo = clientInfo;
}

public String getClientInfo() {
return clientInfo;
}

public void setClientInfo(String clientInfo) {
this.clientInfo = clientInfo;
}
}


10.
package base;

/**
*
*/
public class ReplyMsg extends BaseMsg {
public ReplyMsg() {
super();
setType(MsgType.REPLY);
}
private ReplyBody body;

public ReplyBody getBody() {
return body;
}

public void setBody(ReplyBody body) {
this.body = body;
}
}


11.
package base;

/**
*
*/
public class ReplyServerBody extends ReplyBody {
private String serverInfo;
public ReplyServerBody(String serverInfo) {
this.serverInfo = serverInfo;
}
public String getServerInfo() {
return serverInfo;
}
public void setServerInfo(String serverInfo) {
this.serverInfo = serverInfo;
}
}


12. netty客户端启动类

package client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;

import base.AskMsg;
import base.AskParams;
import base.Constants;
import base.LoginMsg;

/**
*
*/
public class NettyClientBootstrap {
private int port;
private String host;
private SocketChannel socketChannel;
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClientBootstrap(int port, String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server  成功---------");
}
}
public static void main(String[]args) throws InterruptedException, IOException {
Constants.setClientId("002");
NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

LoginMsg loginMsg=new LoginMsg();
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
bootstrap.socketChannel.writeAndFlush(loginMsg);
//        while (true){
//            TimeUnit.SECONDS.sleep(3);
//            AskMsg askMsg=new AskMsg();
//            AskParams askParams=new AskParams();
//            askParams.setAuth("authToken");
//            askMsg.setParams(askParams);
//            bootstrap.socketChannel.writeAndFlush(askMsg);
//        }

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
break;
} else if ("ping".equals(msg.toLowerCase())) {
} else {
AskMsg askMsg=new AskMsg();
AskParams askParams=new AskParams();
askParams.setAuth("authToken");
askParams.setContent(msg);
askMsg.setParams(askParams);
bootstrap.socketChannel.writeAndFlush(askMsg);
}
}

}
}


13. netty客户端操作实现类

package client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

import server.NettyChannelMap;
import base.AskMsg;
import base.BaseMsg;
import base.LoginMsg;
import base.MsgType;
import base.PingMsg;
import base.ReplyClientBody;
import base.ReplyMsg;
import base.ReplyServerBody;

/**
*
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {

private int UNCONNECT_NUM = 0;

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if(UNCONNECT_NUM >= 4) {
System.err.println("connect status is disconnect.");
ctx.close();
//此处当重启次数达到4次之后,关闭此链接后,并重新请求进行一次登录请求
return;
}

IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
System.out.println("send ping to server---date=" + new Date());
PingMsg pingMsg=new PingMsg();
ctx.writeAndFlush(pingMsg);
UNCONNECT_NUM++;
System.err.println("writer_idle over. and UNCONNECT_NUM=" + UNCONNECT_NUM);
break;
case READER_IDLE:
System.err.println("reader_idle over.");
UNCONNECT_NUM++;
//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道
case ALL_IDLE:
System.err.println("all_idle over.");
UNCONNECT_NUM++;
//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道
default:
break;
}
}
}

@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
MsgType msgType=baseMsg.getType();
switch (msgType){
case LOGIN:{
//向服务器发起登录
LoginMsg loginMsg=new LoginMsg();
loginMsg.setPassword("alan");
loginMsg.setUserName("lin");
channelHandlerContext.writeAndFlush(loginMsg);
}break;
case PING:{
System.out.println("receive server ping ---date=" + new Date());
ReplyMsg replyPing=new ReplyMsg();
ReplyClientBody body = new ReplyClientBody("send client msg.");
replyPing.setBody(body);
channelHandlerContext.writeAndFlush(replyPing);
}break;
case ASK:{
AskMsg askMsg=(AskMsg)baseMsg;
ReplyClientBody replyClientBody=new ReplyClientBody("receive server askmsg:" + askMsg.getParams().getContent());
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyClientBody);
channelHandlerContext.writeAndFlush(replyMsg);
}break;
case REPLY:{
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
UNCONNECT_NUM = 0;
System.out.println("UNCONNECT_NUM="+ UNCONNECT_NUM + ",receive server replymsg: "+replyServerBody.getServerInfo());
}
default:break;
}
ReferenceCountUtil.release(msgType);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.err.println("in client exceptionCaught.");
super.exceptionCaught(ctx, cause);
//出现异常时,可以发送或者记录相关日志信息,之后,直接断开该链接,并重新登录请求,建立通道

}

}


14.
package server;

import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
*
*/
public class NettyChannelMap {
private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
public static void add(String clientId,SocketChannel socketChannel){
map.put(clientId,socketChannel);
}
public static Channel get(String clientId){
return map.get(clientId);
}
public static void remove(SocketChannel socketChannel){
for (Map.Entry entry:map.entrySet()){
if (entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}

}


15. netty服务端启动类

package server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

import base.AskMsg;

/**
*
*/
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}

private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(10,5,0));
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public static void main(String []args) throws InterruptedException {
NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
while (true){
//            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
//            if(channel!=null){
//                AskMsg askMsg=new AskMsg();
//                channel.writeAndFlush(askMsg);
//            }
TimeUnit.SECONDS.sleep(10);
}
}
}


16. netty服务端操作实现类

package server;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import base.AskMsg;
import base.BaseMsg;
import base.LoginMsg;
import base.MsgType;
import base.PingMsg;
import base.ReplyBody;
import base.ReplyClientBody;
import base.ReplyMsg;
import base.ReplyServerBody;

/**
*
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {

private int UNCONNECT_NUM_S = 0;

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if(UNCONNECT_NUM_S >= 4) {
System.err.println("client connect status is disconnect.");
ctx.close();
//此处当重启次数达到4次之后,关闭此链接后,清除服务端相关的记录信息
return;
}

IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
System.out.println("send ping to client---date=" + new Date());
PingMsg pingMsg=new PingMsg();
ctx.writeAndFlush(pingMsg);
UNCONNECT_NUM_S++;
System.err.println("writer_idle over. and UNCONNECT_NUM_S=" + UNCONNECT_NUM_S);
break;
case READER_IDLE:
System.err.println("reader_idle over.");
UNCONNECT_NUM_S++;
//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道
case ALL_IDLE:
System.err.println("all_idle over.");
UNCONNECT_NUM_S++;
//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道
default:
break;
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("in channelInactive.");
NettyChannelMap.remove((SocketChannel)ctx.channel());
}

@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

if(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsg loginMsg=(LoginMsg)baseMsg;
if("lin".equals(loginMsg.getUserName())&&"alan".equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println("client"+loginMsg.getClientId()+" 登录成功");
}
}else{
if(NettyChannelMap.get(baseMsg.getClientId())==null){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsg loginMsg=new LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch (baseMsg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)baseMsg;
ReplyMsg replyPing=new ReplyMsg();
ReplyServerBody body = new ReplyServerBody("send server msg.");
replyPing.setBody(body);
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
System.err.println("ping over.");
}break;
case ASK:{
//收到客户端的请求
AskMsg askMsg=(AskMsg)baseMsg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("receive client askmsg:" + askMsg.getParams().getContent());
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客户端回复
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
UNCONNECT_NUM_S = 0;
System.out.println("UNCONNECT_NUM_S=" + UNCONNECT_NUM_S +",receive client replymsg: "+clientBody.getClientInfo());
}break;
default:break;
}
ReferenceCountUtil.release(baseMsg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.err.println("in here has an error.");
NettyChannelMap.remove((SocketChannel)ctx.channel());
super.exceptionCaught(ctx, cause);
System.err.println("channel is exception over. (SocketChannel)ctx.channel()=" + (SocketChannel)ctx.channel());
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: