关于Netty的ReplayingDecoder
2017-05-06 00:00
801 查看
摘要: ReplayingDecoder in Netty 4.1.9.Final
Netty用户手册介绍Decoder的章节重点介绍了ByteToMessageDecoder,顺道提到了一个陌生的ReplayingDecoder,学习并记录之。下文的例子均来自于ReplayingDecoder源码注释。
先看ReplayingDecoder的类定义:
可见其是ByteToMessageDecoder的子类。类定义中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。
与ByteToMessageDecoder不同,该类可以在接收到所需要长度的字节之后再调用decode方法,而不用一遍又一遍的手动检查流中的字节长度。下面给出官方的例子:
首先是作对比的ByteToMessageDecoder风格实现,也是我之前的习惯做法 :-(
可见上面解码时略微繁琐,必须两次判断buf可读字节长度,并手动恢复readerIndex。下面来看简洁的ReplayingDecoder实现:
ReplayingDecoderByteBuf重写了ByteBuf的readXxx()等方法,在调用真正的buf做相关操作时,会先检查可读字节长度,一旦检测到不满足要求就直接抛出REPLAY(REPLAY继承ERROR)。
而ReplayingDecoder重写了ByteToMessageDecoder的callDecode()方法:
此方法会捕获Signal并在catch块中重置ByteBuf的readerIndex。
值得注意的是ReplayingDecoderByteBuf和ReplayingDecoder中的Signal都使用了static final修饰符,且始终引用同一个对象以节省内存开销。如ReplayingDecoder中的Signal定义为:
TCP是基于流的,只保证接收到数据包分片顺序,而不保证接收到的数据包每个分片大小。因此在使用ReplayingDecoder时,即使不存在多线程,同一个线程也可能多次调用decode()方法。在decode中修改ReplayingDecoder的类变量时必须小心谨慎。这里顺便提醒一下除非是特殊设计,如组合使用LineBasedFrameDecoder和StringDecoder,否则独立使用的Decoder都是非共享的。
错误的例子:
当buf中有两个int但分为两个包传过来时,上面代码中decode方法会被调用两次,此时队列size为3,这段代码达不到期望结果。正确的做法是每次在decode中先清空队列:
Netty用户手册介绍Decoder的章节重点介绍了ByteToMessageDecoder,顺道提到了一个陌生的ReplayingDecoder,学习并记录之。下文的例子均来自于ReplayingDecoder源码注释。
先看ReplayingDecoder的类定义:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {}
可见其是ByteToMessageDecoder的子类。类定义中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。
与ByteToMessageDecoder不同,该类可以在接收到所需要长度的字节之后再调用decode方法,而不用一遍又一遍的手动检查流中的字节长度。下面给出官方的例子:
首先是作对比的ByteToMessageDecoder风格实现,也是我之前的习惯做法 :-(
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder{ protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
可见上面解码时略微繁琐,必须两次判断buf可读字节长度,并手动恢复readerIndex。下面来看简洁的ReplayingDecoder实现:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
原理
究其原理是该decoder中使用了ByteBuf的一个特殊实现ReplayingDecoderByteBuf (代理模式),ReplayingDecoderByteBuf的关键部分如下:package io.netty.handler.codec; // ... final class ReplayingDecoderByteBuf extends ByteBuf { private static final Signal REPLAY = ReplayingDecoder.REPLAY; private ByteBuf buffer; // proxy pattern here : ) private void checkIndex(int index, int length) { if (index + length > buffer.writerIndex()) { throw REPLAY; } } private void checkReadableBytes(int readableBytes) { if (buffer.readableBytes() < readableBytes) { throw REPLAY; } } @Override public int readInt() { checkReadableBytes(4); return buffer.readInt(); } // omitted... }
ReplayingDecoderByteBuf重写了ByteBuf的readXxx()等方法,在调用真正的buf做相关操作时,会先检查可读字节长度,一旦检测到不满足要求就直接抛出REPLAY(REPLAY继承ERROR)。
而ReplayingDecoder重写了ByteToMessageDecoder的callDecode()方法:
@Override protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // catches REPLAY here and reset readerIndex }
此方法会捕获Signal并在catch块中重置ByteBuf的readerIndex。
值得注意的是ReplayingDecoderByteBuf和ReplayingDecoder中的Signal都使用了static final修饰符,且始终引用同一个对象以节省内存开销。如ReplayingDecoder中的Signal定义为:
static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");
限制
ReplayingDecoder虽然提供了不少便利,但是也存在使用限制。包括:buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等方法会直接抛出异常);在某些情况下会影响性能(如多次对同一段消息解码)。TCP是基于流的,只保证接收到数据包分片顺序,而不保证接收到的数据包每个分片大小。因此在使用ReplayingDecoder时,即使不存在多线程,同一个线程也可能多次调用decode()方法。在decode中修改ReplayingDecoder的类变量时必须小心谨慎。这里顺便提醒一下除非是特殊设计,如组合使用LineBasedFrameDecoder和StringDecoder,否则独立使用的Decoder都是非共享的。
错误的例子:
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); assert values.size() == 2; out.add(values.poll() + values.poll()); } }
当buf中有两个int但分为两个包传过来时,上面代码中decode方法会被调用两次,此时队列size为3,这段代码达不到期望结果。正确的做法是每次在decode中先清空队列:
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
提高运行效率
为了提高处理复杂消息的性能,ReplayingDecoder提供了checkpoint机制。此方法会将下次decode对buffer开始解码的位置置为当前读指针位置。当需要解码的消息很复杂时,推荐使用枚举泛型来创建多个解码保存点:public enum MyDecoderState { READ_LENGTH, READ_CONTENT; }
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { switch (state()) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }
运行时替换decoder
实际工作中往往需要使用一个decoder来处理多种协议。此时可以用一个decoder来检查协议类别,然后用具体的协议处理器来处理。下面例子中的FirstDecoder就可以看做一个协议探测器:public class FirstDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.pipeline().addLast("second", new SecondDecoder()); if (buf.isReadable()) { // Hand off the remaining data to the second decoder out.add(firstMessage); out.add(buf.readBytes(super.actualReadableBytes())); } else { // Nothing to hand off out.add(firstMessage); } // Remove the first decoder (me) ctx.pipeline().remove(this); } }
相关文章推荐
- 关于学习netty的两个完整服务器客户端范例
- 关于netty的MemoryAwareThreadPoolExecutor
- Netty,Kafka,Muduo关于时间轮的一些思考 之netty时间轮
- Netty,Kafka,Muduo关于时间轮的一些思考 之Muduo时间轮
- 关于龙哥在Netty课程中的FileChannel调用read方法返回0的原因解析
- 关于Netty Pipeline中Handler的执行顺序问题
- 关于 选用netty,而不选nio的原因
- 关于TCP 和 Netty 拆包 粘包
- 关于netty源码的分析
- 关于netty的一些文章收集
- 关于netty
- Netty3.10.1:关于MessageReceived
- 一次关于Netty+Gson造成内存泄露 Memory Analysis分析
- 关于netty的一些基础
- 关于Netty的疑问
- 关于Netty in active中第二章创建的服务端Handler不继承SimpleChannelInboundHandler的原因
- 关于netty的一些介绍文章
- 关于netty UDP不能发送大于2048字节包的问题
- 关于netty的HttpUtil.isTransferEncodingChunked(HttpMessage)方法已过时解决方案
- Netty3.10.1:关于TCP粘包问题 及 Encoder&Decoder