Netty的入门-基础编程
2016-07-08 14:05
405 查看
Server端实现:
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于进行SocketChannel的网络读写
try {
// 服务端启动辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 两个NIO线程组
.channel(NioServerSocketChannel.class) // 设置要创建的Channel
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP参数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持心跳
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}); // 绑定I/O事件的处理类,用于处理请求
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync(); // 只要用于异步操作的通知回调
// 等待服务端监听端口关闭
f.channel().closeFuture().sync(); // 等待服务端链路关闭之后退出
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equals(body) ? new Date(System.currentTimeMillis()).toString() : "Bad Order";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp); // 写到消息队列而不是写到SocketChannel中
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 将消息发送队列中的消息写入到SocketChannel中发送给对方
// 从性能角度考虑,为了防止频繁的唤醒selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
public static void main(String[] args) {
try {
new TimeServer().bind(1234);
} catch (Exception e) {
e.printStackTrace();
}
}
客户端实现:
public class TimeClient {
public void connect(int port, String host) {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
public TimeClientHandler() {
super();
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is " + body);
}
}
public static void main(String[] args) {
new TimeClient().connect(1234, "127.0.0.1");
}
}
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于进行SocketChannel的网络读写
try {
// 服务端启动辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 两个NIO线程组
.channel(NioServerSocketChannel.class) // 设置要创建的Channel
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP参数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持心跳
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}); // 绑定I/O事件的处理类,用于处理请求
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync(); // 只要用于异步操作的通知回调
// 等待服务端监听端口关闭
f.channel().closeFuture().sync(); // 等待服务端链路关闭之后退出
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equals(body) ? new Date(System.currentTimeMillis()).toString() : "Bad Order";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp); // 写到消息队列而不是写到SocketChannel中
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 将消息发送队列中的消息写入到SocketChannel中发送给对方
// 从性能角度考虑,为了防止频繁的唤醒selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
public static void main(String[] args) {
try {
new TimeServer().bind(1234);
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端实现:
public class TimeClient {
public void connect(int port, String host) {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
public TimeClientHandler() {
super();
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is " + body);
}
}
public static void main(String[] args) {
new TimeClient().connect(1234, "127.0.0.1");
}
}
相关文章推荐
- PHP 时间戳 日期 转换等问题
- spring 11 bean配置-基于注解配置bean
- php laravel curD
- 关于github
- Python随堂笔记1-2
- thinkPHP form表单提交参数无法获取
- ThinkPHP的四种路由形式
- c#, 输出二进制
- 安装使用python Django
- ContentPlaceHolderID属性
- JAVA实践one:仿XP自带画板实现
- java date 转string 时间由不精确到精确时,丢失数据,会自动补全
- SpringMVC中的@RequestBody与@ResponseBody
- java基础
- 支付宝开放平台C++方式接入
- POJ 1094 变量排序 解题报告
- java.sql.preparedstatement和java.sql.statement的区别
- Java编程:关于Java异常处理机制
- Java JDK1.8新特性
- Springboot+Spring MVC+Idea工程布置