您的位置:首页 > 理论基础 > 计算机网络

一起写RPC框架(四)RPC网络模块的搭建二 Netty的编码器和译码器

2016-10-19 14:29 429 查看
上一节,我们说到写一个RPC,很核心的一个模块也是基础就是网络通讯部分,定义一个通用的网络传输对象,会方便我们的对业务层面的编码,我们会花费很少的心思在底层的字节流的网络传输上。
因为我们选用的是Netty,写过Netty的或者有一点Netty基础的,我勉强算是Netty刚刚入门的菜鸟吧,都知道我们需要C/S端写一个编码器和译码器。

关于编码器和译码器,有很多种实现方式,没有优劣之分,只有在某种场景下,哪种方式更加适合用的说法,小到StringDecoder/StringEncoder,大到自定义协议的Decoder/Encoder,都会有其最最适合它的场景。我们这节就是要找一个最最适合RPC的编码器和译码器

因为我们知道不管你传输对象定义的有多好,有多么的通用,在网络上传输的全部都是正常人看不懂的字节,而且因为网络的存在,你发送的内容可能会因为过长,会被分成几段来发送,这也就是拆包的概念,所以我们需要对拆包进行处理,相反,如果你每次发送的内容过短,睿智灵活的网络也会很机智的,稍微等下,等你累积了几个你需要发送的短信息一起发送出来,这样可以提高网络传输的效率,降低效率的原因有很多因素,比如TCP的协议头比内容还长,这样的内容在网络上传输的销量是极其低下的
废话不多说,有关Netty粘包和拆包的问题,详看:
http://blog.csdn.net/linuu/article/details/51337748 http://blog.csdn.net/linuu/article/details/51338538

回到正题上,一般而言,我们在写一个Demo版的Netty的编码器和译码器的时候,很实用很好写的方案一般是这样的:
1)固定长度的传输对象,假如我们传输的对象的长度是固定的,假如是1024个字节,那我们处理也会很方便,在netty的handler处理链上加一个LineBasedFrameDecoder问题就解决了
2) 固定字符切割传输的流对象,我们规定好传输的终端以某种特殊服务去切割某个文本信息,比如以"::"或者"$$"等等不常用的符号去划分文本,这样我们也能在约定的协议下,去完成这一切

上述的两种方案是很便捷,也很好实现的方案,Netty也对这两种方法给出了直接的解决方案,但是在我们这个RPC框架下,还是借鉴了TCP的一些协议的思想:

1)比如我们可以规定一个协议头,协议头的第一部分是一个Mark,也就是我们俗称的头标志位,表示该信息是本端口需要接受的信息,如果不是则丢弃

2)然后再来有一个sign,来表示这个消息的内容是什么,是Student,Teacher,还是其他的云云

3)接着再加一个ReuqestId,为什么需要一个请求Id呢?其实很好理解,因为Netty可以认为是双向通信的,假如某个场景下,当Client端发送100次请求,Server端需要对这100次请求一一作出响应的时候,就可以用这个ReuqestId将Request/Response一一对应上,因为网络发送的是异步的,也不保证顺序,也存在丢包的可能,所以先发的响应未必先到,所以有了ReuqestId这样就会防止这种问题

4)协议头的最后一部分信息,就是主题的字节长度,Body Length,这样我们就知道我们接下来再读多少个字节就能获取完整的一段信息了

5)主体部分的信息,Body

上述的场景设计其实还是很常见的,这样做的好处就是比较灵活,易于扩展,坏处就是多了一个协议头,协议头的长度如果大大于主体信息的长度,就会有种大头娃娃的感觉,得不偿失,传输的效率就会变低,不过总体来说,优点还是更加明显一点的,我们看看Jupiter的协议头:

/**
* **************************************************************************************************
*                                          Protocol
*  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
*       2   │   1   │    1   │     8     │      4      │
*  ├ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┤
*           │       │        │           │             │
*  │  MAGIC   Sign    Status   Invoke Id   Body Length                   Body Content              │
*           │       │        │           │             │
*  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
*
* 消息头16个字节定长
* = 2 // MAGIC = (short) 0xbabe
* + 1 // 消息标志位, 用来表示消息类型
* + 1 // 空
* + 8 // 消息 id long 类型
* + 4 // 消息体body长度, int类型
*/


跟我刚才说的还是比较像的(废话,我抄人家的~)

好了,基于上述思想,我们来写我们基于RemotingTransporter的编码和译码器,有一点点小小的变动,不过思想是一样的

先看编码器(Encoder)

package org.laopopo.remoting.netty.encode;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import org.laopopo.remoting.model.RemotingTransporter;
import static org.laopopo.common.protocal.LaopopoProtocol.MAGIC;
import static org.laopopo.common.serialization.SerializerHolder.serializerImpl;

/**
*
* @author BazingaLyn
* @description Netty 对{@link RemotingTransporter}的编码器
* @time 2016年8月10日
* @modifytime
*/
@ChannelHandler.Sharable
public class RemotingTransporterEncoder extends MessageToByteEncoder<RemotingTransporter> {

@Override
protected void encode(ChannelHandlerContext ctx, RemotingTransporter msg, ByteBuf out) throws Exception {
doEncodeRemotingTransporter(msg, out);
}

private void doEncodeRemotingTransporter(RemotingTransporter msg, ByteBuf out) {
byte[] body = serializerImpl().writeObject(msg.getCustomHeader());

out.writeShort(MAGIC). 	           //协议头
writeByte(msg.getTransporterType())// 传输类型 sign 是请求还是响应
.writeByte(msg.getCode())          // 请求类型requestcode 表明主题信息的类型,也代表请求的类型
.writeLong(msg.getOpaque())        //requestId
.writeInt(body.length)             //length
.writeBytes(body);
}

}


编写一个RemotingTransporterEncoder,继承于Netty官方的MessageToByteEncoder的编码器,然后按照我们上述的思想去将对象信息,转换成字节数组

相对于编码器而言,解码器就相对复杂一点,因为我们知道解码器其实就是对网络流读取的过程,说到"读"我们都知道我们需要控制ReadIndex,我们需要正确的移动读的下标,这样才可以正确的读取数据

万能的Netty对此场景提供了一个叫做ReplayingDecoder的解码器,我们需要继承该类,完整的实现如下:

package org.laopopo.remoting.netty.decode;

import static org.laopopo.common.protocal.LaopopoProtocol.MAGIC;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

import org.laopopo.common.exception.remoting.RemotingContextException;
import org.laopopo.common.protocal.LaopopoProtocol;
import org.laopopo.remoting.model.RemotingTransporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author BazingaLyn
* @description Netty 对{@link RemotingTransporter}的解码器
* @time 2016年8月10日
* @modifytime
*/
public class RemotingTransporterDecoder extends ReplayingDecoder<RemotingTransporterDecoder.State> {

private static final Logger logger = LoggerFactory.getLogger(RemotingTransporterDecoder.class);

private static final int MAX_BODY_SIZE = 1024 * 1024 * 5;

private final LaopopoProtocol header = new LaopopoProtocol();

public RemotingTransporterDecoder() {
//设置(下文#state()的默认返回对象)
super(State.HEADER_MAGIC);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case HEADER_MAGIC:
checkMagic(in.readShort()); // MAGIC
checkpoint(State.HEADER_TYPE);
case HEADER_TYPE :
header.type(in.readByte());
checkpoint(State.HEADER_SIGN);
case HEADER_SIGN:
header.sign(in.readByte()); // 消息标志位
checkpoint(State.HEADER_ID);
case HEADER_ID:
header.id(in.readLong()); // 消息id
checkpoint(State.HEADER_BODY_LENGTH);
case HEADER_BODY_LENGTH:
header.bodyLength(in.readInt()); // 消息体长度
checkpoint(State.BODY);
case BODY:
int bodyLength = checkBodyLength(header.bodyLength());
byte[] bytes = new byte[bodyLength];
in.readBytes(bytes);
out.add(RemotingTransporter.newInstance(header.id(), header.sign(),header.type(), bytes));
break;
default:
break;
}
checkpoint(State.HEADER_MAGIC);
}

private int checkBodyLength(int bodyLength) throws RemotingContextException {
if (bodyLength > MAX_BODY_SIZE) {
throw new RemotingContextException("body of request is bigger than limit value "+ MAX_BODY_SIZE);
}
return bodyLength;
}

private void checkMagic(short magic) throws RemotingContextException {
if (MAGIC != magic) {
logger.error("Magic is not match");
throw new RemotingContextException("magic value is not equal "+MAGIC);
}
}

enum State {
HEADER_MAGIC, HEADER_TYPE, HEADER_SIGN, HEADER_ID, HEADER_BODY_LENGTH, BODY
}

}
LaopopoProtocol.java

package org.laopopo.common.protocal;

/**
*
* @author BazingaLyn
* @description 网络传输的协议头信息
* @time 2016年8月9日
* @modifytime
*/
public class LaopopoProtocol {

/** 协议头长度 */
public static final int HEAD_LENGTH = 16;

/** Magic */
public static final short MAGIC = (short) 0xbabe;

/** 发送的是请求信息*/
public static final byte REQUEST_REMOTING = 1;

/** 发送的是响应信息*/
public static final byte RESPONSE_REMOTING = 2;

public static final byte RPC_REMOTING = 3;

public static final byte HANDLER_ERROR = -1;

public static final byte HANDLER_BUSY = -2;

//provider端向registry发送注册信息的code
public static final byte PUBLISH_SERVICE = 65;

//consumer端向registry订阅服务后返回的订阅结果
public static final byte SUBCRIBE_RESULT = 66;

//订阅服务取消
public static final byte SUBCRIBE_SERVICE_CANCEL = 67;

//取消发布服务
public static final byte PUBLISH_CANCEL_SERVICE = 68;

//consumer发送给registry注册服务
public static final byte SUBSCRIBE_SERVICE = 69;

//管理服务的请求
public static final byte MANAGER_SERVICE = 70;

//远程调用的请求
public static final byte RPC_REQUEST = 72;

//降级
public static final byte DEGRADE_SERVICE = 73;

//调用调用的响应
public static final byte RPC_RESPONSE = 74;

//修改负载策略
public static final byte CHANGE_LOADBALANCE = 75;

//统计信息
public static final byte MERTRICS_SERVICE = 76;

//心跳
public static final byte HEARTBEAT = 127;

//ACK
public static final byte ACK = 126;

private byte type;
private byte sign;
private long id;
private int bodyLength;

public byte type() {
return type;
}

public void type(byte type) {
this.type = type;
}

public byte sign() {
return sign;
}

public void sign(byte sign) {
this.sign = sign;
}

public long id() {
return id;
}

public void id(long id) {
this.id = id;
}

public int bodyLength() {
return bodyLength;
}

public void bodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}

}


这个ReplayingDecoder有几个比较关键的方法说明一下

1)构造函数,它的构造函数,传递了一个枚举类型进去,这个是设置state()这个方法的初始值,这样可以进case HEADER_MAGIC的代码分支,首先先校验协议头,是否使我们定义的MAGIC,如果不是的情况下,我们就不进行解码,因为可能此信息与我们规定的不符

2)checkpoint的方法作用有两个,一是改变state的值的状态,二是获取到最新的读指针的下标

3)最后一步重新调用checkpoint(State.HEADER_MAGIC),是把state的值重新设置为初始值HEADER_MAGIC,方便下次信息的解析读取

4)在switch case中主要做的事情,就是将byte[]转换成我们需要的RemotingTransporter的对象,且把主体信息的byte[]放到RemotingTransporter的父类ByteHolder中,关于这块的byte[]的序列化则放在具体的业务场景之下

好了,我们这个小型的RPC,通用的编码器和译码器的大体思想和实现就是这样了,下面我们将介绍网络模块序列化的实现,也就是我们上文serializerImpl这个方法的实现了

本节END~如果有错误的地方,欢迎指正~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: