您的位置:首页 > 其它

netty学习之四 编码解码和传输序列化

2017-05-21 00:00 211 查看
摘要: 信息通过netty在网络传输时候需要将二进制信息进行编解码,不同的序列化方式传输效率不一样,下面可以研究下如何使用netty编解码和序列化

netty发送或接收消息后,Netty必须将消息数据从一种形式转化为另一种。接收消息后,需要将消息从字节码转成Java对象(由某种解码器解码);发送消息前,需要将Java对象转成字节(由某些类型的编码器进行编码)。这种转换一般发生在网络程序中,因为网络上只能传输字节数据。

严格的说其他handlers可以做编码器和适配器,使用不同的Adapter classes取决你想要做什么。如果是解码器则有一个ChannelInboundHandlerAdapter或ChannelInboundHandler,所有的解码器都继承或实现它们。“channelRead”方法/事件被覆盖,这个方法从入站(inbound)通道读取每个消息。重写的channelRead方法将调用每个解码器的“decode”方法并通过ChannelHandlerContext.fireChannelRead(Object msg)传递给ChannelPipeline中的下一个ChannelInboundHandler。
类似入站消息,当你发送一个消息出去(出站)时,除编码器将消息转成字节码外还会转发到下一个ChannelOutboundHandler。


下面看实际情况开发中发送数据的encode方法

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
if (remotingCommand == null) {
LOGGER.error("Message is null");
return;
}
try {
ByteBuffer byteBuffer = codec.encode(remotingCommand);
out.writeBytes(byteBuffer);
} catch (Exception e) {
Channel channel = new NettyChannel(ctx);
LOGGER.error("encode exception, addr={}, remotingCommand={}", RemotingHelper.parseChannelRemoteAddr(channel), remotingCommand.toString(), e);
RemotingHelper.closeChannel(channel);
}
}
}


使用code.encode的作用是根据自己定义的序列化方法将需要的数据进行序列化,众所周知Java的序列化性能极低,在一些对性能要求较高的场景是不合适的,所以需要提供一些其它常见的序列化方式,比如hession,kryo,protobuf,jackson等序列化方式

public ByteBuffer encode(RemotingCommand remotingCommand) throws Exception {
RemotingSerializable serializable =
getRemotingSerializable(remotingCommand.getSid());
// header length size
int length = 4;
// serializable id (int)
length += 4;
//  header data length
byte[] headerData = serializable.serialize(remotingCommand);
length += headerData.length;
byte[] bodyData = null;
byte[] bodyClass = null;
RemotingCommandBody body = remotingCommand.getBody();
if (body != null) {
// body data
bodyData = serializable.serialize(body);
length += bodyData.length;
bodyClass = body.getClass().getName().getBytes();
length += bodyClass.length;
length += 4;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// serializable Id
result.putInt(serializable.getId());
// header length
result.putInt(headerData.length);
// header data
result.put(headerData);
if (bodyData != null) {
//  body length
result.putInt(bodyData.length);
//  body data
result.put(bodyData);
// body class
result.put(bodyClass);
}
result.flip();
return result;
}


序列化接口:

public interface RemotingSerializable {

int getId();

byte[] serialize(final Object obj) throws Exception;

<T> T deserialize(final byte[] data, Class<T> clazz) throws Exception;
}

通过实现该接口可以实现自定义的序列化方式,下面看下,当接收到网络回传的字节数据如何进行反序列化呢

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
public NettyDecoder() {
super(appContext.getConfig().getParameter(ExtConfig.NETTY_FRAME_LENGTH_MAX, Constants.DEFAULT_BUFFER_SIZE), 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
try {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}

byte[] tmpBuf = new byte[frame.capacity()];
frame.getBytes(0, tmpBuf);
frame.release();

ByteBuffer byteBuffer = ByteBuffer.wrap(tmpBuf);
return codec.decode(byteBuffer);
} catch (Exception e) {
Channel channel = new NettyChannel(ctx);
LOGGER.error("decode exception, {}", RemotingHelper.parseChannelRemoteAddr(channel), e);
RemotingHelper.closeChannel(channel);
}
return null;
}
}

调用codec的反序列化方法,将byteBuffer转换成自定义的对象。

public RemotingCommand decode(ByteBuffer byteBuffer) throws Exception {
int length = byteBuffer.limit();
int serializableId = byteBuffer.getInt();
RemotingSerializable serializable =
getRemotingSerializable(serializableId);

int headerLength = byteBuffer.getInt();
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);

RemotingCommand cmd = serializable.deserialize(headerData, RemotingCommand.class);

int remaining = length - 4 - 4 - headerLength;

if (remaining > 0) {

int bodyLength = byteBuffer.getInt();
int bodyClassLength = remaining - 4 - bodyLength;

if (bodyLength > 0) {

byte[] bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);

byte[] bodyClassData = new byte[bodyClassLength];
byteBuffer.get(bodyClassData);

cmd.setBody((RemotingCommandBody) serializable.deserialize(bodyData, Class.forName(new String(bodyClassData))));
}
}
return cmd;
}

至此,整个数据传输中的序列化和反序列化过程结束,具体代码已经上传到github。

完整代码链接:https://github.com/winstonelei/Smt
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: