Netty之实现自定义简单的编解码器二(MessageToMessageEncoder<CharSequence>和MessageToMessageDecoder<ByteBuf>)
2016-12-14 21:46
986 查看
1、对于MessageToMessageEncoder的理解
MessageToMessageEncoder编码器,这里的第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和之前的MessageToByte的原理是一样的了。所以要在MessageToMessageDecoder<ByteBuf>的解码器里面,手动的指定,是对ByteBuf类型的对象进行解码的操作。
2、编写MyStringEncoder编码器和MyStringDecoder解码器,以便于,Netty中可以直接发送和接收String类型的数据
2.1 MyStringEncoder编码器的代码
2.2 MyStringDecoder解码器的代码
7.1 直接发送String类型的数据
7.2 直接接收String类型的数据
MessageToMessageEncoder编码器,这里的第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和之前的MessageToByte的原理是一样的了。所以要在MessageToMessageDecoder<ByteBuf>的解码器里面,手动的指定,是对ByteBuf类型的对象进行解码的操作。
2、编写MyStringEncoder编码器和MyStringDecoder解码器,以便于,Netty中可以直接发送和接收String类型的数据
2.1 MyStringEncoder编码器的代码
import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.List; public class MyStringEncoder extends MessageToMessageEncoder<CharSequence> { private final Charset charset; public MyStringEncoder() { this(Charset.defaultCharset()); } public MyStringEncoder(Charset charset) { if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception { if (msg.length() == 0) { return; } out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), this.charset)); } }
2.2 MyStringDecoder解码器的代码
import java.nio.charset.Charset; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; public class MyStringDecoder extends MessageToMessageDecoder<ByteBuf> { private final Charset charset; public MyStringDecoder() { this(Charset.defaultCharset()); } public MyStringDecoder(Charset charset) { if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(msg.toString(this.charset)); } }3、服务端的实现
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public void bind(int port) throws Exception { // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接 // 另一个线程组用于处理SocketChannel的网络读写 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // NIO服务器端的辅助启动类 降低服务器开发难度 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel .option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数 .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接 .childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类 // 处理网络IO事件 // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler ChannelFuture f = serverBootstrap.bind(port).sync(); System.out.println("Server启动"); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出 释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); System.out.println("服务器优雅的释放了线程资源..."); } } /** * 网络事件处理器 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 增加自定义的编码器和解码器 ch.pipeline().addLast(new MyStringEncoder()); ch.pipeline().addLast(new MyStringDecoder()); // 服务端的处理器 ch.pipeline().addLast(new ServerHandler()); } } public static void main(String[] args) throws Exception { int port = 9998; new Server().bind(port); } }4、服务端Handler的实现
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接受客户端的数据 String body = (String) msg; System.out.println("Client :" + body); // 服务端,回写数据给客户端 // 直接回写整形的数据 String data = "Hello ,I am Server ..."; ctx.writeAndFlush(data); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }5、客户端的实现
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { /** * 连接服务器 * * @param port * @param host * @throws Exception */ public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { // 客户端辅助启动类 对客户端配置 Bootstrap b = new Bootstrap(); b.group(group)// .channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true)// .handler(new MyChannelHandler());// // 异步链接服务器 同步等待链接成功 ChannelFuture f = b.connect(host, port).sync(); System.out.println(f); // 发送消息 Thread.sleep(1000); f.channel().writeAndFlush("777"); f.channel().writeAndFlush("666"); Thread.sleep(2000); f.channel().writeAndFlush("888"); // 等待链接关闭 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); System.out.println("客户端优雅的释放了线程资源..."); } } /** * 网络事件处理器 */ private class MyChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 增加自定义的编码器和解码器 ch.pipeline().addLast(new MyStringEncoder()); ch.pipeline().addLast(new MyStringDecoder()); // 客户端的处理器 ch.pipeline().addLast(new ClientHandler()); } } public static void main(String[] args) throws Exception { new Client().connect(9998, "127.0.0.1"); } }6、客户端Handler的实现
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String body = (String) msg; System.out.println("Client :" + body); // 只是读数据,没有写数据的话 // 需要自己手动的释放的消息 } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }7、关于直接发送和接收String类型的编码
7.1 直接发送String类型的数据
7.2 直接接收String类型的数据
相关文章推荐
- Netty使用Http上传文件
- tomcat、netty以及nodejs的helloworld性能对比 3ff8
- 分布式服务通讯框架XXL-RPC
- Netty入门-client/server
- flatbuffers 和netty的结合使用
- netty 处理远程主机强制关闭一个连接
- 实现一个 Java 版的 Redis (1)----百行代码解析Redis 协议.
- Netty 源码分析(三):服务器端的初始化和注册过程
- 理解Netty中的Zero-copy
- 轻量级分布式 RPC 框架
- spark总体概况
- netty文章收藏
- Netty系列之Netty百万级推送服务设计要点
- Netty初步
- Netty ChannelBuffer 简介
- 一篇文章,读懂Netty的高性能架构之道
- 解决用netty去做web服务时,post长度过大的问题
- netty4研究系列-序
- netty的reconnect方式之一