您的位置:首页 > 其它

Netty实现服务端客户端长连接通讯及心跳检测

2015-07-22 14:52 603 查看
通过netty实现服务端与客户端的长连接通讯,及心跳检测。

基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

环境JDK1.8 和netty5

以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

设计消息类型:

public enum MsgType {

PING,ASK,REPLY,LOGIN

}

Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!

public abstract class BaseMsg implements Serializable {

private static final long serialVersionUID = 1L;

private MsgType type;

//必须唯一,否者会出现channel调用混乱

private String clientId;

//初始化客户端id

public BaseMsg() {

this.clientId = Constants.getClientId();

}

public String getClientId() {

return clientId;

}

public void setClientId(String clientId) {

this.clientId = clientId;

}

public MsgType getType() {

return type;

}

public void setType(MsgType type) {

this.type = type;

}

}

常量设置:

public class Constants {

private static String clientId;

public static String getClientId() {

return clientId;

}

public static void setClientId(String clientId) {

Constants.clientId = clientId;

}

}

登录类型消息:

public class LoginMsg extends BaseMsg {

private String userName;

private String password;

public LoginMsg() {

super();

setType(MsgType.LOGIN);

}

public String getUserName() {

return userName;

}

public void setUserName(String userName) {

this.userName = userName;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

}

心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {

public PingMsg() {

super();

setType(MsgType.PING);

}

}

请求类型消息:

public class AskMsg extends BaseMsg {

public AskMsg() {

super();

setType(MsgType.ASK);

}

private AskParams params;

public AskParams getParams() {

return params;

}

public void setParams(AskParams params) {

this.params = params;

}

}

//请求类型参数

//必须实现序列化接口

public class AskParams implements Serializable {

private static final long serialVersionUID = 1L;

private String auth;

public String getAuth() {

return auth;

}

public void setAuth(String auth) {

this.auth = auth;

}

}

响应类型消息:

public class ReplyMsg extends BaseMsg {

public ReplyMsg() {

super();

setType(MsgType.REPLY);

}

private ReplyBody body;

public ReplyBody getBody() {

return body;

}

public void setBody(ReplyBody body) {

this.body = body;

}

}

//相应类型body对像

public class ReplyBody implements Serializable {

private static final long serialVersionUID = 1L;

}

public class ReplyClientBody extends ReplyBody {

private String clientInfo;

public ReplyClientBody(String clientInfo) {

this.clientInfo = clientInfo;

}

public String getClientInfo() {

return clientInfo;

}

public void setClientInfo(String clientInfo) {

this.clientInfo = clientInfo;

}

}

public class ReplyServerBody extends ReplyBody {

private String serverInfo;

public ReplyServerBody(String serverInfo) {

this.serverInfo = serverInfo;

}

public String getServerInfo() {

return serverInfo;

}

public void setServerInfo(String serverInfo) {

this.serverInfo = serverInfo;

}

}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {

private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();

public static void add(String clientId,SocketChannel socketChannel){

map.put(clientId,socketChannel);

}

public static Channel get(String clientId){

return map.get(clientId);

}

public static void remove(SocketChannel socketChannel){

for (Map.Entry entry:map.entrySet()){

if (entry.getValue()==socketChannel){

map.remove(entry.getKey());

}

}

}

}

Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

//channel失效,从Map中移除

NettyChannelMap.remove((SocketChannel)ctx.channel());

}

@Override

protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

if(MsgType.LOGIN.equals(baseMsg.getType())){

LoginMsg loginMsg=(LoginMsg)baseMsg;

if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){

//登录成功,把channel存到服务端的map中

NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());

System.out.println("client"+loginMsg.getClientId()+" 登录成功");

}

}else{

if(NettyChannelMap.get(baseMsg.getClientId())==null){

//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录

LoginMsg loginMsg=new LoginMsg();

channelHandlerContext.channel().writeAndFlush(loginMsg);

}

}

switch (baseMsg.getType()){

case PING:{

PingMsg pingMsg=(PingMsg)baseMsg;

PingMsg replyPing=new PingMsg();

NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);

}break;

case ASK:{

//收到客户端的请求

AskMsg askMsg=(AskMsg)baseMsg;

if("authToken".equals(askMsg.getParams().getAuth())){

ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");

ReplyMsg replyMsg=new ReplyMsg();

replyMsg.setBody(replyBody);

NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);

}

}break;

case REPLY:{

//收到客户端回复

ReplyMsg replyMsg=(ReplyMsg)baseMsg;

ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();

System.out.println("receive client msg: "+clientBody.getClientInfo());

}break;

default:break;

}

ReferenceCountUtil.release(baseMsg);

}

}

ServerBootstrap:

public class NettyServerBootstrap {

private int port;

private SocketChannel socketChannel;

public NettyServerBootstrap(int port) throws InterruptedException {

this.port = port;

bind();

}

private void bind() throws InterruptedException {

EventLoopGroup boss=new NioEventLoopGroup();

EventLoopGroup worker=new NioEventLoopGroup();

ServerBootstrap bootstrap=new ServerBootstrap();

bootstrap.group(boss,worker);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.option(ChannelOption.SO_BACKLOG, 128);

//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去

bootstrap.option(ChannelOption.TCP_NODELAY, true);

//保持长连接状态

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline p = socketChannel.pipeline();

p.addLast(new ObjectEncoder());

p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));

p.addLast(new NettyServerHandler());

}

});

ChannelFuture f= bootstrap.bind(port).sync();

if(f.isSuccess()){

System.out.println("server start---------------");

}

}

public static void main(String []args) throws InterruptedException {

NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);

while (true){

SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");

if(channel!=null){

AskMsg askMsg=new AskMsg();

channel.writeAndFlush(askMsg);

}

TimeUnit.SECONDS.sleep(5);

}

}

}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {

//利用写空闲发送心跳检测消息

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent e = (IdleStateEvent) evt;

switch (e.state()) {

case WRITER_IDLE:

PingMsg pingMsg=new PingMsg();

ctx.writeAndFlush(pingMsg);

System.out.println("send ping to server----------");

break;

default:

break;

}

}

}

@Override

protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

MsgType msgType=baseMsg.getType();

switch (msgType){

case LOGIN:{

//向服务器发起登录

LoginMsg loginMsg=new LoginMsg();

loginMsg.setPassword("yao");

loginMsg.setUserName("robin");

channelHandlerContext.writeAndFlush(loginMsg);

}break;

case PING:{

System.out.println("receive ping from server----------");

}break;

case ASK:{

ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");

ReplyMsg replyMsg=new ReplyMsg();

replyMsg.setBody(replyClientBody);

channelHandlerContext.writeAndFlush(replyMsg);

}break;

case REPLY:{

ReplyMsg replyMsg=(ReplyMsg)baseMsg;

ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();

System.out.println("receive client msg: "+replyServerBody.getServerInfo());

}

default:break;

}

ReferenceCountUtil.release(msgType);

}

}

bootstrap

public class NettyClientBootstrap {

private int port;

private String host;

private SocketChannel socketChannel;

private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);

public NettyClientBootstrap(int port, String host) throws InterruptedException {

this.port = port;

this.host = host;

start();

}

private void start() throws InterruptedException {

EventLoopGroup eventLoopGroup=new NioEventLoopGroup();

Bootstrap bootstrap=new Bootstrap();

bootstrap.channel(NioSocketChannel.class);

bootstrap.option(ChannelOption.SO_KEEPALIVE,true);

bootstrap.group(eventLoopGroup);

bootstrap.remoteAddress(host,port);

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));

socketChannel.pipeline().addLast(new ObjectEncoder());

socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));

socketChannel.pipeline().addLast(new NettyClientHandler());

}

});

ChannelFuture future =bootstrap.connect(host,port).sync();

if (future.isSuccess()) {

socketChannel = (SocketChannel)future.channel();

System.out.println("connect server 成功---------");

}

}

public static void main(String[]args) throws InterruptedException {

Constants.setClientId("001");

NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

LoginMsg loginMsg=new LoginMsg();

loginMsg.setPassword("yao");

loginMsg.setUserName("robin");

bootstrap.socketChannel.writeAndFlush(loginMsg);

while (true){

TimeUnit.SECONDS.sleep(3);

AskMsg askMsg=new AskMsg();

AskParams askParams=new AskParams();

askParams.setAuth("authToken");

askMsg.setParams(askParams);

bootstrap.socketChannel.writeAndFlush(askMsg);

}

}

}

具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: