您的位置:首页 > 理论基础 > 计算机网络

netty TCP server心跳机制

2015-10-09 13:52 537 查看
启动TCP serverNettyTCPServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

public class NettyTCPServer {
private int port;

public NettyTCPServer(int port) {
this.port = port;
}

public void start() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
// new LoggingHandler(LogLevel.INFO),
new IdleStateHandler(5, 0, 0),// 心跳控制
new ServerHandler());
}
});

// Start the server.
ChannelFuture f = b.bind(port).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down all event loops to terminate all threads.
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();

}
}

public static byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}

private static int toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}

public static void main(String args[]) throws Exception {
new NettyTCPServer(999).start();

}

}


ServerHandler.java 心跳机制

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Arrays;
import java.util.Map;

public class ServerHandler extends SimpleChannelInboundHandler<Object> {

@Override
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
// TODO Auto-generated method stub

}

@Override
public void handlerRemoved(ChannelHandlerContext arg0) throws Exception {
// TODO Auto-generated method stub

}

@Override
public void channelActive(ChannelHandlerContext arg0) throws Exception {
// TODO Auto-generated method stub

System.out.println("channelActive"+arg0.channel().remoteAddress());
System.out.println(arg0.channel().hashCode());
NettyChannelMap.add(arg0.channel().remoteAddress().toString(), arg0.channel());

Map<String,Channel> map = NettyChannelMap.getAllMap();

for (Map.Entry<String, Channel> entry : map.entrySet()) {
System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue());
}

}

@Override
public void channelInactive(ChannelHandlerContext arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println("channelInactive");

arg0.close();
}

public static String[] bytesToHexStrings(byte[] src) {
if (src == null || src.length <= 0) {
return null;
}
String[] str = new String[src.length];

for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
str[i] = "0";
}
str[i] = hv;
}
return str;
}
public static byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}
private static int toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
@Override
public void channelRead(ChannelHandlerContext arg0, Object arg1)
throws Exception {
ByteBuf buf = (ByteBuf) arg1;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
System.out.println("channelRead:" + Arrays.toString(bytesToHexStrings(req)));
byte[] responseByteArray = hexStringToByte("6832003200684B15120200010C6100000200E416");
NettyChannelMap.get("10.1.5.197").writeAndFlush(responseByteArray);
System.out.println("send OK");

// TODO Auto-generated method stub
}

@Override
public void channelReadComplete(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub

}

@Override
public void channelRegistered(ChannelHandlerContext arg0) throws Exception {
// TODO Auto-generated method stub

}

@Override
public void channelUnregistered(ChannelHandlerContext arg0)
throws Exception {
System.out.println("channelUnregistered");

NettyChannelMap.remove(arg0.channel());

}

@Override
public void channelWritabilityChanged(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub

}

@Override
public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1)
throws Exception {
// TODO Auto-generated method stub

}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
//	System.out.println(NettyTCPServer.getMap().keySet());

if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
/*读超时*/
System.out.println("READER_IDLE 读超时");
NettyChannelMap.remove(ctx.channel());
System.out.println(ctx.channel()+"will be removed from map");
System.out.println("print now map");
Map<String,Channel> map = NettyChannelMap.getAllMap();

for (Map.Entry<String, Channel> entry : map.entrySet()) {
System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue());
}

ctx.disconnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
System.out.println("WRITER_IDLE 写超时");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
/*总超时*/
System.out.println("ALL_IDLE 总超时");
ctx.disconnect();
}
}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
System.out.println("channelRead0: "+(String)msg);
}


NettyChannelMap.java

import io.netty.channel.Channel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @ClassName: NettyChannelMap
* @Description: TODO
* @author yao.b
* @date 2015年9月25日 上午11:37:35
*
*/
public class NettyChannelMap {
private static Map<String,Channel> map=new ConcurrentHashMap<String, Channel>();
public static void add(String clientId,Channel socketChannel){
map.put(clientId,socketChannel);
}
public static Channel get(String clientId){
Channel channel = map.get(clientId);
return channel;

}

public static Map<String,Channel> getAllMap(){
return map;
}

public static void remove(Channel channel){
for (Map.Entry entry:map.entrySet()){
if (entry.getValue()==channel){
map.remove(entry.getKey());
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: