您的位置:首页 > 编程语言 > Java开发

[netty]-消息编解码之Java原生序列化

2017-02-01 13:23 369 查看
消息对象在网络上传输时,我们往往要对消息进行编解码。现在编解码技术非常多,包括Java原生的序列化、Google的protoBuf、hessian等等。这一篇博客我们主要介绍Java的原生序列化编解码以及其优缺点。

基于Java提供的对象输入/输出流 ObjectInputStream和ObjectOutputStream可以直接将对象序列化为可存储的字节数组写入文件或则是在网络上传输。Java对象的序列化目的一般只有两个:

网络传输;

对象持久化。

1. Java序列化的缺点

Java序列化从JDK1.1 就已经提供,不需要添加其他的任何依赖库就可以实现,只需要实现Serializable接口,并生成唯一序列化ID就可以序列化。但是Java原生序列化有诸多的缺点,导致在实际的生产环境中基本都是用其余开源的编解码框架。下面我们来看看Java原生的序列化有什么缺点:

(1) 无法跨语言:这也是最致命的缺陷,导致无法再异构系统中使用。

(2)序列化之后码流太大:序列化之后占用太大的字节。

(3)序列化性能太低:序列化比较占用CPU的时间。

2.业界主流的编解码框架:

由于Java序列化的表现差强人意,所以业界推出了很多高性能的编解码框架,比如Google的Protobuf, hessian等等。这篇博文主要说明Java原生序列化的应用。

3. netty中使用Java序列化编解码

在不考虑跨语言调用和性能时,Java序列化任然是首选的机制,因为原生的API使用起来十分方便。这一节主要包括:

netty序列化服务端编程

netty序列化客户端编程

应用实例

1. 服务端编程

应用场景:客户端发送用户订购请求消息,然后服务端接收到请求消息对象后处理,然后发送响应对象给客户端。请求消息和效应消息的对象定义如下:

request:

package netty.quanwei.p7;

import java.io.Serializable;

/**
* Created by louyuting on 17/2/1.
*
*/
public class SubscribeReq implements Serializable{
private String messageID;

private String userName;

private String productName;

private String phone;

private String address;

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public String getMessageID() {
return messageID;
}

public void setMessageID(String messageID) {
this.messageID = messageID;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}
}
@Override
public String toString() {
return "SubscribeReq: [messageID]:"+ messageID + " [userName]:" +userName
+ " [productName]:" +productName+ " [phone]:" +phone+ " [address]:" +address;
}


response:

package netty.quanwei.p7;

import java.io.Serializable;

/**
* Created by louyuting on 17/2/1.
*/
public class SubscribeResp implements Serializable {
private String messageID;

private String respCode;

private String description;

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getMessageID() {
return messageID;
}

public void setMessageID(String messageID) {
this.messageID = messageID;
}

public String getRespCode() {
return respCode;
}

public void setRespCode(String respCode) {
this.respCode = respCode;
}
}
@Override
public String toString() {
return "SubscribeReq: [messageID]:"+ messageID + " [respCode]:" +respCode
+ " [description]:" +description;
}


这里我们使用netty提供的ObjectDecoder和ObjectEncoder来进行编解码。

服务端的主函数编码如下:

package netty.quanwei.p7;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
* Created by louyuting on 17/1/31.
*/
public class SubreqServer {

public void bind(int port) throws Exception{
//配置服务端NIO 线程组
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap();

try {
server.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 解码器: 构造器传入了两个参数: #1 单个对象序列化后最大字节长度,这是设置是1M;
*                           #2 类解析器: weakCachingConcurrentResolver创建线程安全的WeakReferenceMa对类加载器进行缓存,
*                                      支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏.
*/
ch.pipeline().addLast(new ObjectDecoder(1024*1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );

ch.pipeline().addLast(new ObjectEncoder());

ch.pipeline().addLast(new SubreqServerHandler());
}
});

//绑定端口, 同步等待成功
ChannelFuture future = server.bind(port).sync();

//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

/**
* main 函数
* @param args
*/
public static void main(String[] args) {
SubreqServer server = new SubreqServer();
try {
server.bind(18888);
} catch (Exception e) {
e.printStackTrace();
}
}
}


在handler容器中增加了一个ObjectDecoder,负责对实现了序列化接口的POJO进行解码,它有多个构造函数,支持不同的类解析器。这里我们使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMa对类加载器进行缓存,支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏。 此外还设置了单个对象序列化后最大字节长度,这是设置是1M。

最后订购消息在SubreqServerHandler中处理:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
* Created by louyuting on 17/1/31.
*/
public class SubreqServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log_debug("Server -> read");

SubscribeReq req = (SubscribeReq)msg;

if( "louyuting".equalsIgnoreCase(req.getUserName()) ){
System.out.println("service accept client subscript req :[\n"+ req.toString() +"]");

ctx.writeAndFlush( resp(req.getMessageID()) );
}
}

private SubscribeResp resp(String reqID){
SubscribeResp response = new SubscribeResp();
response.setMessageID(reqID);
response.setRespCode("0");
response.setDescription("subscribe is success book will arrive after 3 days");
return response;
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("Server -> read complete");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}


客户端程序开发

客户端核心代码思路是:

1)在客户端将netty提供的编解码器添加到ChannelPipeline

2)在客户端链路被激活的时候发送10条订购消息,为了检验netty提供的Java序列化功能是否支持TCP的黏包/拆包功能,客户端一次性构造10条订购消息并一次性发送给服务器,看服务器能够成功反序列化。

3) 客户端接收到服务端的反馈消息,打印。

客户端核心代码:

package netty.quanwei.p7;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
* Created by louyuting on 17/1/31.
* netty 时间服务器 客户端
*/
public class SubreqClient {

public void connect(int port, String host) throws Exception{
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap client = new Bootstrap();

try {
client.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024,
ClassResolvers.cacheDisabled(this.getClass().getClassLoader())) );

ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubreqClientHandler());
}
});

//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();

//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}

/**
* main 函数
* @param args
*/
public static void main(String[] args) {
SubreqClient client = new SubreqClient();
try {
client.connect(18888, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}


在ObjectDecoder构造函数中配置不允许对类加载器缓存。下面再看SubreqClientHandler的实现:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
* Created by louyuting on 17/1/31.
*/
public class SubreqClientHandler extends ChannelInboundHandlerAdapter{

public SubreqClientHandler() {
}

/**
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("client -> active");

for(int i=0; i<10; i++){
ctx.write(subReq(String.valueOf(i)));
}

// 写入10个对象到发送缓冲之后再一次性 flush写入通道
ctx.flush();
}

private SubscribeReq subReq(String id){
SubscribeReq req = new SubscribeReq();
req.setMessageID(id);
req.setUserName("louyuting");
req.setProductName("iphone 7");
req.setPhone("13026317652");
req.setAddress("HUST");

return req;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log_debug("client -> read");

LogUtil.log_debug("receive server response: { " + msg.toString() +"]");

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}


在链路激活时,即channelActive()函数中一次性构建10条消息,然后一次发送给服务器。

运行结果

整个运行流程就是:

客户端先构建消息对象–》

在客户端与服务端连接上时,一次性发送10个对象给服务端,这里发送时在客户端会经过ObjectEncoder编码为字节数据–》

服务端接收到字节数组,先经过ObjectDecoder解码成为实际的Java对象(这里的ObjectDecoder需要传入类解析器参数,类解析器参数也要传入加载器参数),然后服务端的handler处理后染回response对象给客户端,返回response对象在服务端也会被ObjectEncoder编码—》

客户端收到响应,也是先解码,然后回去响应消息,并处理。

以上基本上就是完整的流程,我们看看运行结果:

服务端:

2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:0 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:1 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:2 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:3 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:4 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:5 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:6 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:7 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:8 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:9 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17:Server -> read complete


收到了完整的10个对象:尽管客户端一次批量发送了10条订购请求消息,TCP会对消息进行拆包和黏包,但是并不影响最后的运行结果,服务端成功接收到了10条请求订购的消息,与客户端一致。

客户端运行结果如下:

2017-02-01 12:41:17:client -> active
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:0 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:1 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:2 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:3 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:4 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:5 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:6 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:7 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:8 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:9 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]


客户端也收到了10条反馈信息。

本文完整代码的github地址:

https://github.com/leetcode-hust/leetcode/tree/master/louyuting/src/netty/quanwei/p7
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: