您的位置:首页 > 其它

基于Netty UDP实现的简单心跳机制

2015-09-26 00:00 721 查看
摘要: 基于Netty UDP实现的简单心跳机制package com.simple.netty;import java.util.Hashtable;import java.util.Iterator;import java.util.Set;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.DatagramPacket;import io.netty.channel.socket.nio.NioDatagramChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class UDPServer {public static void main(String[] args) {Bootstrap strap = new Bootstrap();EventLoopGroup workerGroup = new NioEventLoopGroup();strap.channel(NioDatagramChannel.class).group(workerGroup).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel)throws Exception {channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO),new StringEncoder(), new StringDecoder());channel.pipeline().addLast(new MyHeartbeatChannelHandler());}});try {ChannelFuture future = strap.bind(7777).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}class MyHeartbeatChannelHandler extendsSimpleChannelInboundHandler<DatagramPacket> implements Runnable {private Hashtable<String, Long> observe = new Hashtable<String, Long>();public MyHeartbeatChannelHandler() {new Thread(this).start();}@Overrideprotected void channelRead0(ChannelHandlerContext paramChannelHandlerContext,DatagramPacket paramI) throws Exception {}private int flag = 0;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {synchronized (Object.class) {if (msg instanceof DatagramPacket) {observe.put(((DatagramPacket) msg).sender().toString(),System.currentTimeMillis());}System.out.println(++flag);}}@Overridepublic boolean acceptInboundMessage(Object msg) throws Exception {return super.acceptInboundMessage(msg);}@Overridepublic void run() {while (true) {synchronized (Object.class) {Set<String> keys = observe.keySet();Iterator<String> mIterator = keys.iterator();while (mIterator.hasNext()) {String k = mIterator.next();long l = observe.get(k);if (l > 0 && System.currentTimeMillis() - l > 3 * 1000) {mIterator.remove();System.out.println(k + "掉线");}}}}}}package com.simple.netty;import java.util.Random;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.DatagramPacket;import io.netty.channel.socket.nio.NioDatagramChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.util.concurrent.Future;import io.netty.util.concurrent.GenericFutureListener;public class UDPClient {public static void main(String[] args) {for (int i = 0; i < 1000; i++) {new Thread() {public void run() {Bootstrap strap = new Bootstrap();strap.channel(NioDatagramChannel.class).group(new NioEventLoopGroup()).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel)throws Exception {channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO),new StringEncoder(),new StringDecoder());channel.pipeline().addLast("heartbeat",new HeartbeatChannelHandler());}});try {ChannelFuture future = strap.connect("127.0.0.1", 7777).sync();int number = 3 + new Random().nextInt(5);while (true) {future.channel().writeAndFlush("88888888").addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> paramF)throws Exception {System.out.println(paramF.isSuccess());}});Thread.sleep(1000);--number;if (number == 0)break;}} catch (InterruptedException e) {e.printStackTrace();}};}.start();}}}class HeartbeatChannelHandler extendsSimpleChannelInboundHandler<DatagramPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext paramChannelHandlerContext,DatagramPacket paramI) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {ctx.writeAndFlush("8888888888888");}}

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty 心跳