Netty 快速入门系列 - Chapter 3 Netty5.x【第八讲】 - Client 重连
2018-03-25 18:58
579 查看
如何实现Netty Client重连?
Netty5Client: 如果建立连接失败,Call notifyReconnection 启动 connect 重连,notify 启动 run 方法,检测重连。使用CAS 避免重复Notify。如果连接成功, condition.await() 进入等待。package com.john.netty.learn.ch05;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Netty5Client implements Runnable {
private String ip;
private int port;
private Bootstrap bootstrap;
public AtomicBoolean reconnectionScanLaunch;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private ChannelFuture channelFuture;
private EventLoopGroup workers;
public Netty5Client(String ip, int port) {
this.ip = ip;
this.port = port;
bootstrap = new Bootstrap();
workers = new NioEventLoopGroup();
reconnectionScanLaunch = new AtomicBoolean(false);
}
public void start() throws Exception {
new Thread(this).start();
try {
bootstrap.group(workers);
bootstrap.channel(NioSocketChannel.class);
// 设置管道工厂
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler(Netty5Client.this));
}
});
channelFuture = bootstrap.connect(this.ip, this.port).sync();
} catch (Throwable e) {
e.printStackTrace();
notifyReconnection();
}
}
public void shutdown() {
workers.shutdownGracefully();
}
private void waitForReconnectionStart() throws InterruptedException {
try {
lock.lockInterruptibly();
while (reconnectionScanLaunch.get() == false) {
System.out.println("wait for netty bootstrap reconnenct");
this.condition.await();
}
System.out.println("await for netty bootstrap reconnenct");
} finally {
lock.unlock();
}
}
@Override
public void run() {
while (true) {
try {
waitForReconnectionStart();
System.out.println("start to try to reconnenct ");
reconnect();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
shutdown();
return;
} finally {
reconnectionScanLaunch.compareAndSet(true, false);
}
}
}
private void reconnect() throws InterruptedException {
try {
channelFuture = bootstrap.connect(this.ip, this.port);
channelFuture.sync();
System.out.println("Success, reconnect... ");
return;
} catch (Throwable e) {
System.out.println("Failed, Reconnection again after 1 sec");
Thread.sleep(000);
reconnect();
}
}
public void notifyReconnection() {
System.out.println("reconnectionScanLaunch == " + reconnectionScanLaunch);
if (reconnectionScanLaunch.compareAndSet(false, true)) {
System.out.println("notify Reconnection");
try {
lock.lockInterruptibly();
System.out.println("Signal for Netty Bootstrap Reconnenct");
this.condition.signal();
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
return;
} finally {
lock.unlock();
}
}
}
public void console() throws Exception {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, "GBK"));
while (true) {
System.out.println("请输入:");
String line = bufferedReader.readLine();
this.send(line);
this.send(line);
this.send(line);
if ("quit".equalsIgnoreCase(line)) {
break;
}
}
}
private void send(String message) {
try {
System.out.println("send...");
if (reconnectionScanLaunch.get()) {
System.out.println("send failed...");
return;
}
Channel channel = channelFuture.channel();
if (channel.isActive()) {
channel.writeAndFlush(message);
return;
}
this.notifyReconnection();
} catch (Exception e) {
e.printStackTrace();
this.notifyReconnection();
}
}
public static void main(String[] args) throws Exception {
Netty5Client netty5Client = new Netty5Client("127.0.0.1", 23);
netty5Client.start();
netty5Client.console();
netty5Client.shutdown();
Executors.newCachedThreadPool();
}
}
ClientHandler:如果Server 连接关闭, channelInactive 方法触发,开始启动Connect重连
netty5Client.notifyReconnection();
package com.john.netty.learn.ch05;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private Netty5Client netty5Client;
public ClientHandler(Netty5Client netty5Client) {
this.netty5Client = netty5Client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Client Read message " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive(ChannelHandlerContext " + ctx + ")");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive(ChannelHandlerContext " + ctx + ")");
netty5Client.notifyReconnection();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
Netty5Server 和 ServerHandler Code 如下,但不是重点
package com.john.netty.learn.ch05;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Netty5Server {
private int port;
public Netty5Server(int port) {
this.port = port;
}
public void start() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup workers = new NioEventLoopGroup();
try {
// 设置线程池
serverBootstrap.group(boss, workers);
// 设置socket工厂
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置管道工厂
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);// serverSocketchannel的设置,链接缓冲池的大小 (未完成Accept操作,等待Socket Accept)
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// socketchannel的设置,维持链接的活跃,清除死链接
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);// socketchannel的设置,关闭延迟发送
// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port);
System.out.println("start server " + port);
// 等待服务端关闭
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully
4000
();
workers.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Netty5Server(23).start();
}
}
如果先启动Clinet,日志如下: 开启 Reconnection 检查, 但是无法操作Write 方法。
当 Server启动后, 成功Reconnect ,同时检查程序wait
当输入 I want to learn more skill of IT.后,发送3次,出现 I want to learn more skill of IT.I want to learn more skill of IT.I want to learn more skill of IT.粘包现象,由于没有一个稳定数据协议结构导致的, 以后的章节将介绍如何避免。
所有源码下载 :https://download.csdn.net/download/netcobol/10308871
Netty5Client: 如果建立连接失败,Call notifyReconnection 启动 connect 重连,notify 启动 run 方法,检测重连。使用CAS 避免重复Notify。如果连接成功, condition.await() 进入等待。package com.john.netty.learn.ch05;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Netty5Client implements Runnable {
private String ip;
private int port;
private Bootstrap bootstrap;
public AtomicBoolean reconnectionScanLaunch;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private ChannelFuture channelFuture;
private EventLoopGroup workers;
public Netty5Client(String ip, int port) {
this.ip = ip;
this.port = port;
bootstrap = new Bootstrap();
workers = new NioEventLoopGroup();
reconnectionScanLaunch = new AtomicBoolean(false);
}
public void start() throws Exception {
new Thread(this).start();
try {
bootstrap.group(workers);
bootstrap.channel(NioSocketChannel.class);
// 设置管道工厂
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler(Netty5Client.this));
}
});
channelFuture = bootstrap.connect(this.ip, this.port).sync();
} catch (Throwable e) {
e.printStackTrace();
notifyReconnection();
}
}
public void shutdown() {
workers.shutdownGracefully();
}
private void waitForReconnectionStart() throws InterruptedException {
try {
lock.lockInterruptibly();
while (reconnectionScanLaunch.get() == false) {
System.out.println("wait for netty bootstrap reconnenct");
this.condition.await();
}
System.out.println("await for netty bootstrap reconnenct");
} finally {
lock.unlock();
}
}
@Override
public void run() {
while (true) {
try {
waitForReconnectionStart();
System.out.println("start to try to reconnenct ");
reconnect();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
shutdown();
return;
} finally {
reconnectionScanLaunch.compareAndSet(true, false);
}
}
}
private void reconnect() throws InterruptedException {
try {
channelFuture = bootstrap.connect(this.ip, this.port);
channelFuture.sync();
System.out.println("Success, reconnect... ");
return;
} catch (Throwable e) {
System.out.println("Failed, Reconnection again after 1 sec");
Thread.sleep(000);
reconnect();
}
}
public void notifyReconnection() {
System.out.println("reconnectionScanLaunch == " + reconnectionScanLaunch);
if (reconnectionScanLaunch.compareAndSet(false, true)) {
System.out.println("notify Reconnection");
try {
lock.lockInterruptibly();
System.out.println("Signal for Netty Bootstrap Reconnenct");
this.condition.signal();
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
return;
} finally {
lock.unlock();
}
}
}
public void console() throws Exception {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, "GBK"));
while (true) {
System.out.println("请输入:");
String line = bufferedReader.readLine();
this.send(line);
this.send(line);
this.send(line);
if ("quit".equalsIgnoreCase(line)) {
break;
}
}
}
private void send(String message) {
try {
System.out.println("send...");
if (reconnectionScanLaunch.get()) {
System.out.println("send failed...");
return;
}
Channel channel = channelFuture.channel();
if (channel.isActive()) {
channel.writeAndFlush(message);
return;
}
this.notifyReconnection();
} catch (Exception e) {
e.printStackTrace();
this.notifyReconnection();
}
}
public static void main(String[] args) throws Exception {
Netty5Client netty5Client = new Netty5Client("127.0.0.1", 23);
netty5Client.start();
netty5Client.console();
netty5Client.shutdown();
Executors.newCachedThreadPool();
}
}
ClientHandler:如果Server 连接关闭, channelInactive 方法触发,开始启动Connect重连
netty5Client.notifyReconnection();
package com.john.netty.learn.ch05;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private Netty5Client netty5Client;
public ClientHandler(Netty5Client netty5Client) {
this.netty5Client = netty5Client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Client Read message " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive(ChannelHandlerContext " + ctx + ")");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive(ChannelHandlerContext " + ctx + ")");
netty5Client.notifyReconnection();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
Netty5Server 和 ServerHandler Code 如下,但不是重点
package com.john.netty.learn.ch05;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Netty5Server {
private int port;
public Netty5Server(int port) {
this.port = port;
}
public void start() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup workers = new NioEventLoopGroup();
try {
// 设置线程池
serverBootstrap.group(boss, workers);
// 设置socket工厂
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置管道工厂
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);// serverSocketchannel的设置,链接缓冲池的大小 (未完成Accept操作,等待Socket Accept)
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// socketchannel的设置,维持链接的活跃,清除死链接
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);// socketchannel的设置,关闭延迟发送
// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port);
System.out.println("start server " + port);
// 等待服务端关闭
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully
4000
();
workers.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Netty5Server(23).start();
}
}
package com.john.netty.learn.ch05; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ServerHandler extends SimpleChannelInboundHandler<String> { public ServerHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("Read message " + msg); ctx.writeAndFlush("Hi "+ msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive(ChannelHandlerContext "+ctx+")"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive(ChannelHandlerContext "+ctx+")"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
如果先启动Clinet,日志如下: 开启 Reconnection 检查, 但是无法操作Write 方法。
当 Server启动后, 成功Reconnect ,同时检查程序wait
当输入 I want to learn more skill of IT.后,发送3次,出现 I want to learn more skill of IT.I want to learn more skill of IT.I want to learn more skill of IT.粘包现象,由于没有一个稳定数据协议结构导致的, 以后的章节将介绍如何避免。
所有源码下载 :https://download.csdn.net/download/netcobol/10308871
相关文章推荐
- Netty 快速入门系列 - Chapter 3 Netty5.x【第九讲】 - 单客户多Client 重连
- Netty 快速入门系列 - Chapter 3 Netty5.x【第六讲】 -Netty5 案例
- Netty 快速入门系列 - Chapter 3 Netty5.x【第七讲】 - Channel线程安全?
- Netty 快速入门系列 - Chapter 5 Netty之序列化【第十二讲】 Java Serializable
- Netty 快速入门系列 - Chapter 2 Netty3.x 【第五讲】 - 源码讲解
- Netty 快速入门系列 - Chapter 8 数据包协议【第二十二讲】Socket攻击及防护
- Netty 快速入门系列 - Chapter 1 传统OIO与NIO - NIO 【第二讲】
- Netty 快速入门系列 - Chapter 6 自定义数据协议【第十三讲】 通过大端序列方法将4个字节int转成 byte数组
- Netty 快速入门系列 - Chapter 8 数据包协议【第二十讲】解决方案-Pipeline用例代码
- Netty 快速入门系列 - Chapter 6 自定义数据协议【第十四讲】ByteBuffer 和 ChannelBuffers
- Netty 快速入门系列 - Chapter 7 数据包协议【第十八讲】解决方案-经典协议包结构
- Netty 快速入门系列 - Chapter 6 自定义数据协议【第十五讲】自定义方法Serializer
- Netty 快速入门系列 - Chapter 4 Netty心跳【第十讲】 - IdleStateHandler学习
- Netty 快速入门系列 - Chapter 7 数据包协议【第十九讲】解决方案-粘包分包原理
- Netty 快速入门系列 - Chapter 8 数据包协议【第二十一讲】FrameDecoder 讲解
- Netty 快速入门系列 - Chapter 7 数据包协议【第十六讲】数据传输问题
- Netty 快速入门系列 - Chapter 5 Netty之序列化【第十一讲】 Protocol buff
- Netty 快速入门系列 - Chapter 7 数据包协议【第十七讲】解决方案- 分割符 及 长度+数据
- Netty 快速入门系列 - Chapter 1 传统OIO与NIO - 传统OIO 【第一讲】
- Netty 快速入门系列 - Chapter 2 Netty3.x 【第四讲】 - 基本原理