您的位置:首页 > 编程语言

Netty的入门-基础编程

2016-07-08 14:05 405 查看
Server端实现:

public class TimeServer {

public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于进行SocketChannel的网络读写
try {
// 服务端启动辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 两个NIO线程组
.channel(NioServerSocketChannel.class) // 设置要创建的Channel
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP参数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持心跳
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}); // 绑定I/O事件的处理类,用于处理请求
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync(); // 只要用于异步操作的通知回调
// 等待服务端监听端口关闭
f.channel().closeFuture().sync(); // 等待服务端链路关闭之后退出
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private class TimeServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equals(body) ? new Date(System.currentTimeMillis()).toString() : "Bad Order";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp); // 写到消息队列而不是写到SocketChannel中
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 将消息发送队列中的消息写入到SocketChannel中发送给对方
// 从性能角度考虑,为了防止频繁的唤醒selector进行消息发送
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}

}

public static void main(String[] args) {
try {
new TimeServer().bind(1234);
} catch (Exception e) {
e.printStackTrace();
}
}
}


客户端实现:
public class TimeClient {

public void connect(int port, String host) {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;

public TimeClientHandler() {
super();
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is " + body);
}

}

public static void main(String[] args) {
new TimeClient().connect(1234, "127.0.0.1");
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: