您的位置:首页 > 其它

Netty 快速入门系列 - Chapter 3 Netty5.x【第八讲】 - Client 重连

2018-03-25 18:58 579 查看
如何实现Netty Client重连?
Netty5Client: 如果建立连接失败,Call notifyReconnection 启动 connect 重连,notify 启动 run 方法,检测重连。使用CAS 避免重复Notify。如果连接成功, condition.await() 进入等待。package com.john.netty.learn.ch05;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Netty5Client implements Runnable {

private String ip;

private int port;

private Bootstrap bootstrap;

public AtomicBoolean reconnectionScanLaunch;

private ReentrantLock lock = new ReentrantLock();

private Condition condition = lock.newCondition();

private ChannelFuture channelFuture;

private EventLoopGroup workers;

public Netty5Client(String ip, int port) {

this.ip = ip;
this.port = port;

bootstrap = new Bootstrap();
workers = new NioEventLoopGroup();

reconnectionScanLaunch = new AtomicBoolean(false);

}

public void start() throws Exception {

new Thread(this).start();

try {

bootstrap.group(workers);

bootstrap.channel(NioSocketChannel.class);

// 设置管道工厂

bootstrap.handler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler(Netty5Client.this));
}
});

channelFuture = bootstrap.connect(this.ip, this.port).sync();

} catch (Throwable e) {

e.printStackTrace();

notifyReconnection();

}
}

public void shutdown() {

workers.shutdownGracefully();
}

private void waitForReconnectionStart() throws InterruptedException {

try {

lock.lockInterruptibly();

while (reconnectionScanLaunch.get() == false) {

System.out.println("wait for netty bootstrap reconnenct");

this.condition.await();

}

System.out.println("await for netty bootstrap reconnenct");

} finally {

lock.unlock();
}

}

@Override
public void run() {

while (true) {

try {

waitForReconnectionStart();

System.out.println("start to try to reconnenct ");

reconnect();

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

shutdown();

return;

} finally {

reconnectionScanLaunch.compareAndSet(true, false);
}

}

}

private void reconnect() throws InterruptedException {

try {

channelFuture = bootstrap.connect(this.ip, this.port);

channelFuture.sync();

System.out.println("Success, reconnect... ");

return;

} catch (Throwable e) {

System.out.println("Failed, Reconnection again after 1 sec");

Thread.sleep(000);

reconnect();

}

}

public void notifyReconnection() {

System.out.println("reconnectionScanLaunch == " + reconnectionScanLaunch);

if (reconnectionScanLaunch.compareAndSet(false, true)) {

System.out.println("notify Reconnection");

try {

lock.lockInterruptibly();

System.out.println("Signal for Netty Bootstrap Reconnenct");

this.condition.signal();

} catch (InterruptedException interruptedException) {

Thread.currentThread().interrupt();

return;

} finally {

lock.unlock();
}

}

}

public void console() throws Exception {

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, "GBK"));

while (true) {

System.out.println("请输入:");

String line = bufferedReader.readLine();

this.send(line);
this.send(line);
this.send(line);

if ("quit".equalsIgnoreCase(line)) {

break;
}
}
}

private void send(String message) {

try {

System.out.println("send...");

if (reconnectionScanLaunch.get()) {

System.out.println("send failed...");

return;
}

Channel channel = channelFuture.channel();

if (channel.isActive()) {

channel.writeAndFlush(message);

return;
}

this.notifyReconnection();

} catch (Exception e) {

e.printStackTrace();

this.notifyReconnection();
}

}

public static void main(String[] args) throws Exception {

Netty5Client netty5Client = new Netty5Client("127.0.0.1", 23);

netty5Client.start();

netty5Client.console();

netty5Client.shutdown();

Executors.newCachedThreadPool();
}

}



ClientHandler:如果Server 连接关闭, channelInactive 方法触发,开始启动Connect重连 
netty5Client.notifyReconnection();
package com.john.netty.learn.ch05;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<String> {

private Netty5Client netty5Client;

public ClientHandler(Netty5Client netty5Client) {
this.netty5Client = netty5Client;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println("Client Read message " + msg);

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("channelActive(ChannelHandlerContext " + ctx + ")");

}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println("channelInactive(ChannelHandlerContext " + ctx + ")");

netty5Client.notifyReconnection();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();
}

}

Netty5Server 和 ServerHandler Code 如下,但不是重点
package com.john.netty.learn.ch05;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Netty5Server {

private int port;

public Netty5Server(int port) {

this.port = port;

}

public void start() throws InterruptedException {

ServerBootstrap serverBootstrap = new ServerBootstrap();

EventLoopGroup boss = new NioEventLoopGroup();

EventLoopGroup workers = new NioEventLoopGroup();

try {

// 设置线程池
serverBootstrap.group(boss, workers);

// 设置socket工厂
serverBootstrap.channel(NioServerSocketChannel.class);

// 设置管道工厂
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});

serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);// serverSocketchannel的设置,链接缓冲池的大小 (未完成Accept操作,等待Socket Accept)
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// socketchannel的设置,维持链接的活跃,清除死链接
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);// socketchannel的设置,关闭延迟发送

// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port);

System.out.println("start server " + port);

// 等待服务端关闭
channelFuture.channel().closeFuture().sync();

} finally {

boss.shutdownGracefully
4000
();

workers.shutdownGracefully();
}
}

public static void main(String[] args) throws InterruptedException {

new Netty5Server(23).start();
}

}

package com.john.netty.learn.ch05;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ServerHandler extends SimpleChannelInboundHandler<String> {

public ServerHandler() {

}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println("Read message " + msg);

ctx.writeAndFlush("Hi "+ msg);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("channelActive(ChannelHandlerContext "+ctx+")");

}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println("channelInactive(ChannelHandlerContext "+ctx+")");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();
}

}


如果先启动Clinet,日志如下: 开启 Reconnection 检查, 但是无法操作Write 方法。



当 Server启动后, 成功Reconnect ,同时检查程序wait



当输入 I want to learn more skill of IT.后,发送3次,出现 I want to learn more skill of IT.I want to learn more skill of IT.I want to learn more skill of IT.粘包现象,由于没有一个稳定数据协议结构导致的, 以后的章节将介绍如何避免。



所有源码下载 :https://download.csdn.net/download/netcobol/10308871
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty
相关文章推荐