您的位置:首页 > 其它

netty服务器向客户端同步请求

2016-10-20 11:06 274 查看
一些特殊情况下可能会由服务器向客户端发起请,本文使用netty4,展示如何由服务器向客户端发起同步请求,下面直接铺出服务器端的代码,如果需要完整实例可参考netty客户端同步请求

SimpleChatServerHandler

package com.netty.chart;

import java.util.concurrent.CountDownLatch;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String>{

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

public CountDownLatch latch;

public int rec;

public SimpleChatServerHandler(CountDownLatch latch, int rec) {
super();
this.latch = latch;
this.rec = rec;
}

private String result;

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for(Channel channel : channels){
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
}
channels.add(incoming);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel outcoming = ctx.channel();
for(Channel channel : channels){
channel.writeAndFlush("[SERVER] - " + outcoming.remoteAddress() + " 离开\n");
}
channels.remove(ctx.channel());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String string) throws Exception {
Channel incoming = ctx.channel();
if(rec == 1){
result = string;
}
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush("[" + incoming.remoteAddress() + "] : " + string + "\n");
} else {
channel.writeAndFlush("[you] : " + string + "\n");
}
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if(rec == 1){
latch.countDown();//消息返回完毕,释放同步锁,具体业务需要判断指令是否匹配
rec = 0;
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}

public void resetSync(CountDownLatch latch2, int rec2) {
this.latch = latch2;
this.rec = rec2;
}

public String getResult() {
return result;
}
}


SimpleChatServerInitializer

package com.netty.chart;

import java.util.concurrent.CountDownLatch;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {

public CountDownLatch latch;

public int rec;

public SimpleChatServerInitializer(CountDownLatch latch, int rec) {
super();
this.latch = latch;
this.rec = rec;
}

private SimpleChatServerHandler handler;

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ByteBuf bf = Unpooled.copiedBuffer("\r\n".getBytes());
handler = new SimpleChatServerHandler(latch, rec);
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, bf));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", handler);
System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
}

public void resetSync(CountDownLatch latch, int rec){
handler.resetSync(latch, rec);
}

public String getResult(){
return handler.getResult();
}
}


SimpleChatServer

package com.netty.chart;

import java.util.concurrent.CountDownLatch;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleChatServer {

private int port;

public SimpleChatServer(int port) {
this.port = port;
}

private SimpleChatServerInitializer initializer;

public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
Event
a0c9
LoopGroup workerGroup = new NioEventLoopGroup();
initializer = new SimpleChatServerInitializer(new CountDownLatch(0), 0);
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(initializer) //(4)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

System.out.println("服务器 启动了...");

// 绑定端口,开始接收进来的连接
ChannelFuture future = server.bind(port).sync(); // (7)
//10秒之后发送同步请求
Thread.sleep(10000);
this.send();
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("服务器 关闭了...");
}

}

public void send() throws Exception{
ChannelGroup channels = SimpleChatServerHandler.channels;
System.out.println("============  消息发送开始 ============");
for (Channel channel : channels) {
CountDownLatch latch = new CountDownLatch(1);
initializer.resetSync(latch, 1);
channel.writeAndFlush("------test" +"\r\n");
latch.await();
System.out.println("=============服务器同步结果:"+initializer.getResult());
}
}

public static void main(String[] args) throws Exception {
SimpleChatServer chatServer = new SimpleChatServer(8888);
chatServer.run();
}
}


如有更好的方式望指出,谢谢!

转载请注明出处。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐