您的位置:首页 > 其它

Netty心跳功能的实现

2015-07-11 00:00 309 查看
摘要: Netty心跳功能

public class NettyClient {

public static void main(String[] args) {

ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

Timer trigger=new HashedWheelTimer();  //注意
final ChannelHandler timeOutHandler=new ReadTimeoutHandler(trigger,60);//设置client读超时
final ChannelHandler idleStateHandler=new IdleStateHandler(trigger,60,5,0);  //设置读写超时写超时为60s,读超时为5s
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("timeOutHandler",timeOutHandler);
pipeline.addLast("idleStateHandler",idleStateHandler);
pipeline.addLast("SocketLinkState",new SocketLinkState());
pipeline.addLast("handler", new ClientHandler());
return pipeline;
}

});

ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8000));

// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();

// Shut down thread pools to exit.
bootstrap.releaseExternalResources();

}
}

SocketLinkState.java的实现
public class SocketLinkState extends IdleStateAwareChannelHandler {

public void exceptionCaught(ChannelHandlerContext ctx,ExceptionEvent e) throws Exception{

Throwable throwed=e.getCause();
throwed.printStackTrace();
if(throwed instanceof ReadTimeoutException){
ctx.getChannel().close();

}else if(throwed instanceof IOException){

ctx.getChannel().close();
}else{

super.exceptionCaught(ctx,e);
}

}

public void channelIdle(ChannelHandlerContext ctx,IdleStateEvent e) throws Exception{

super.channelIdle(ctx,e);
Channel channel=e.getChannel();

switch(e.getState()){
case READER_IDLE: {// 读取时间超时
// e.getChannel().close();// 关闭网络连接

// new RuntimeException().printStackTrace();
System.out.println("读取超时");
break;
}
case WRITER_IDLE: {// 读取时间超时
SocketHeartBeat.sendHeartBeat(channel);
System.out.println("写超时");
break;
}
}
}
}
SocketHeartBeat.sendHeartBeat(channel) 的实现
public class SocketHeartBeat extends SimpleChannelHandler{

public static void sendHeartBeat(Channel channel) {

channel.write("abc");
}

}

ClientHandler.java的实现
public class ClientHandler extends SimpleChannelHandler{

int i=0;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof String) {
String message = (String) e.getMessage();
System.out.println(message);
i++;
Thread.sleep(7000);//我设置的写超时时间为5s,现在休眠7s,超时会发送心跳信息给server
e.getChannel().write("xxxxxxx"+i);
}

super.messageReceived(ctx, e);
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("已经与Server建立连接。。。。");
System.out.println("\n请输入要发送的信息:");
super.channelConnected(ctx, e);
e.getChannel().write("server,我们连接了!!!");

}

}

NettyServer.java的实现
public class NettyServer {

public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new StringDecoder(), new StringEncoder(), new ServerHandler());
}
});

Channel bind = bootstrap.bind(new InetSocketAddress(8000));
System.out.println("Server已经启动,监听端口: " + bind.getLocalAddress() + ", 等待客户端注册。。。");
}
}

ServerHandler.java的实现
public class ServerHandler extends SimpleChannelHandler {

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof String) {
String message = (String) e.getMessage();
System.out.println("Client发来:" + message);

if("abc".equalsIgnoreCase(message)){
System.out.println("心跳。。。。。。。");
}else{
e.getChannel().write("Server已收到刚发送的:" + message);
}

}

super.messageReceived(ctx, e);
}

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("有一个客户端注册上来了。。。");
System.out.println("Client:" + e.getChannel().getRemoteAddress());
System.out.println("Server:" + e.getChannel().getLocalAddress());
System.out.println("\n等待客户端输入。。。");
super.channelConnected(ctx, e);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: