基于Netty的RPC简单框架实现(三):Kryo实现序列化
2015-09-12 14:05
1131 查看
[b][b]1.[/b]序列[/b]化和反序列化
网络中都是以字节序列的形式来传输数据的,因此在发送消息时需要先将对象序列化转换为字节序列,然后将获得的字节序列发送出去,消息接收方接收到字节序列后将之反序列化获得传输的对象,从收发双方来看就如同直接发送和接收了对象一样。
2.第三方依赖
本例使用目前最新版的kryo-serializers 0.36用于序列化
3.序列化和反序列化的实现
序列化和反序列化的对象在本例中只有两种:1.客户端向服务端发出的调用请求RpcRequest 2.服务端向客户端返回的调用结果RpcResponse
(1).RpcResponse
从RPC服务端传回给客户端的某次调用请求的结果
(2).RpcRequest
已在上一节中给出
(3).KryoSerializer
实际负责序列化和反序列化
(4).KryoHolder
由于kryo对象是线程不安全的,当有多个netty的channel同时连接时,各channel是可能工作在不同的线程上的(netty中一个IO线程可以对应多个channel,而一个channel只能对应一个线程,详细可以参考netty线程模型),若共用同一个kryo对象会出现并发问题,因此用ThreadLocal在每个线程保留一个各自的kryo对象,保证不会大量创建kryo对象的同时避免了并发问题
(5).KryoReflectionFactory
(6).RpcRequestSerializer
read()中按照相同的顺序读出值并根据这些值构建出一个RpcRequest对象并返回。
(7).RpcResponseSerializer
4.测试
测试内容与下一节的Netty网络传输一同测试
网络中都是以字节序列的形式来传输数据的,因此在发送消息时需要先将对象序列化转换为字节序列,然后将获得的字节序列发送出去,消息接收方接收到字节序列后将之反序列化获得传输的对象,从收发双方来看就如同直接发送和接收了对象一样。
2.第三方依赖
本例使用目前最新版的kryo-serializers 0.36用于序列化
<dependency> <groupId>de.javakaffee</groupId> <artifactId>kryo-serializers</artifactId> <version>0.36</version> </dependency>使用maven直接在pom.xml中添加上面的依赖即可
3.序列化和反序列化的实现
序列化和反序列化的对象在本例中只有两种:1.客户端向服务端发出的调用请求RpcRequest 2.服务端向客户端返回的调用结果RpcResponse
(1).RpcResponse
从RPC服务端传回给客户端的某次调用请求的结果
package com.maigo.rpc.context; public class RpcResponse { private int id; private Object result; private Throwable throwable; private boolean isInvokeSuccess; public int getId() { return id; } public void setId(int id) { this.id = id; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public Throwable getThrowable() { return throwable; } public void setThrowable(Throwable throwable) { this.throwable = throwable; } public boolean isInvokeSuccess() { return isInvokeSuccess; } public void setInvokeSuccess(boolean isInvokeSuccess) { this.isInvokeSuccess = isInvokeSuccess; } }RpcResponse中的id对应着该次请求的RpcRequest中的id,isInvokeSuccess表示调用中是否有异常抛出,result和throwable分别表示调用结果和调用过程抛出的异常。
(2).RpcRequest
已在上一节中给出
(3).KryoSerializer
实际负责序列化和反序列化
package com.maigo.rpc.serializer; import java.io.IOException; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; public class KryoSerializer { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; public static void serialize(Object object, ByteBuf byteBuf) { Kryo kryo = KryoHolder.get(); int startIdx = byteBuf.writerIndex(); ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf); try { byteOutputStream.write(LENGTH_PLACEHOLDER); Output output = new Output(1024*4, -1); output.setOutputStream(byteOutputStream); kryo.writeClassAndObject(output, object); output.flush(); output.close(); int endIdx = byteBuf.writerIndex(); byteBuf.setInt(startIdx, endIdx - startIdx - 4); } catch (IOException e) { e.printStackTrace(); } } public static Object deserialize(ByteBuf byteBuf) { if(byteBuf == null) return null; Input input = new Input(new ByteBufInputStream(byteBuf)); Kryo kryo = KryoHolder.get(); return kryo.readClassAndObject(input); } }serialize()将一个对象通过kryo序列化并写入ByteBuf中,注意到在头部预留了4个字节用于写入长度信息。deserialize()将ByteBuf中的内容反序列化还原出传输的对象。其中序列化和反序列化均用到了kryo对象,该对象是从KryoHolder中通过get()拿到的。
(4).KryoHolder
由于kryo对象是线程不安全的,当有多个netty的channel同时连接时,各channel是可能工作在不同的线程上的(netty中一个IO线程可以对应多个channel,而一个channel只能对应一个线程,详细可以参考netty线程模型),若共用同一个kryo对象会出现并发问题,因此用ThreadLocal在每个线程保留一个各自的kryo对象,保证不会大量创建kryo对象的同时避免了并发问题
package com.maigo.rpc.serializer; import com.esotericsoftware.kryo.Kryo; public class KryoHolder { private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>() { protected Kryo initialValue() { Kryo kryo = new KryoReflectionFactory(); return kryo; }; }; public static Kryo get() { return threadLocalKryo.get(); } }可见,最终用于序列化和反序列化的kryo对象是通过new KryoReflectionFactory()创建的。
(5).KryoReflectionFactory
package com.maigo.rpc.serializer; import java.lang.reflect.InvocationHandler; import java.net.URI; import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.EnumMap; import java.util.EnumSet; import java.util.GregorianCalendar; import java.util.Map; import java.util.UUID; import java.util.regex.Pattern; import com.esotericsoftware.kryo.Serializer; import com.maigo.rpc.context.RpcRequest; import com.maigo.rpc.context.RpcResponse; import de.javakaffee.kryoserializers.ArraysAsListSerializer; import de.javakaffee.kryoserializers.BitSetSerializer; import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer; import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer; import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer; import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer; import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer; import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer; import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer; import de.javakaffee.kryoserializers.CopyForIterateMapSerializer; import de.javakaffee.kryoserializers.DateSerializer; import de.javakaffee.kryoserializers.EnumMapSerializer; import de.javakaffee.kryoserializers.EnumSetSerializer; import de.javakaffee.kryoserializers.GregorianCalendarSerializer; import de.javakaffee.kryoserializers.JdkProxySerializer; import de.javakaffee.kryoserializers.KryoReflectionFactorySupport; import de.javakaffee.kryoserializers.RegexSerializer; import de.javakaffee.kryoserializers.SubListSerializers; import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer; import de.javakaffee.kryoserializers.URISerializer; import de.javakaffee.kryoserializers.UUIDSerializer; import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; public class KryoReflectionFactory extends KryoReflectionFactorySupport { public KryoReflectionFactory() { setRegistrationRequired(false); setReferences(true); register(RpcRequest.class, new RpcRequestSerializer()); register(RpcResponse.class, new RpcResponseSerializer()); register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer()); register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer()); register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer()); register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer()); register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer()); register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer()); register(Pattern.class, new RegexSerializer()); register(BitSet.class, new BitSetSerializer()); register(URI.class, new URISerializer()); register(UUID.class, new UUIDSerializer()); register(GregorianCalendar.class, new GregorianCalendarSerializer()); register(InvocationHandler.class, new JdkProxySerializer()); UnmodifiableCollectionsSerializer.registerSerializers(this); SynchronizedCollectionsSerializer.registerSerializers(this); } @Override @SuppressWarnings({"rawtypes", "unchecked"}) public Serializer<?> getDefaultSerializer(Class clazz) { if(EnumSet.class.isAssignableFrom(clazz)) return new EnumSetSerializer(); if(EnumMap.class.isAssignableFrom(clazz)) return new EnumMapSerializer(); if(Collection.class.isAssignableFrom(clazz)) return new CopyForIterateCollectionSerializer(); if(Map.class.isAssignableFrom(clazz)) return new CopyForIterateMapSerializer(); if(Date.class.isAssignableFrom(clazz)) return new DateSerializer( clazz ); if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz) || SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz)) return SubListSerializers.createFor(clazz); return super.getDefaultSerializer(clazz); } }导入的包非常多,主要完成的功能是给大量类类型注册其对应的Serializer。setRegistrationRequired()设置是否只能序列化已注册的类,此处必须设置为false,因为RPC请求和回应中都可能包含用户自定义的类,这些类显然是不可能在kryo中注册过的。setReferences()若设置成false在序列化Exception时似乎有问题,此处维持打开(默认也是打开)。注意到给RpcRequest.class和RpcResponse.class分别注册了对应的Serializer为RpcRequestSerializer和RpcResponseSerializer。这是由于kryo对未注册的类序列化后的格式是
x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>里面包含类的全类名,导致序列化后的字节序列很长,故应该实现一个自定义的Serializer用于已知类型的序列化和反序列化缩短序列化后的字节序列。
(6).RpcRequestSerializer
package com.maigo.rpc.serializer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.maigo.rpc.context.RpcRequest; public class RpcRequestSerializer extends Serializer<RpcRequest> { @Override public void write(Kryo kryo, Output output, RpcRequest object) { output.writeInt(object.getId()); output.writeByte(object.getMethodName().length()); output.write(object.getMethodName().getBytes()); kryo.writeClassAndObject(output, object.getArgs()); } @Override public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type) { RpcRequest rpcRequest = null; int id = input.readInt(); byte methodLength = input.readByte(); byte[] methodBytes = input.readBytes(methodLength); String methodName = new String(methodBytes); Object[] args = (Object[])kryo.readClassAndObject(input); rpcRequest = new RpcRequest(id, methodName, args); return rpcRequest; } }write()中按顺序往output中写入id,调用方法名的长度和调用方法名的字节数组,最后是调用方法的参数列表,由于不知道参数的确切类型,此处调用传进的kryo对象的writeClassAndObject()方法对参数进行序列化。
read()中按照相同的顺序读出值并根据这些值构建出一个RpcRequest对象并返回。
(7).RpcResponseSerializer
package com.maigo.rpc.serializer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.maigo.rpc.context.RpcResponse; public class RpcResponseSerializer extends Serializer<RpcResponse> { @Override public void write(Kryo kryo, Output output, RpcResponse object) { output.writeInt(object.getId()); output.writeBoolean(object.isInvokeSuccess()); if(object.isInvokeSuccess()) kryo.writeClassAndObject(output, object.getResult()); else kryo.writeClassAndObject(output, object.getThrowable()); } @Override public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type) { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setId(input.readInt()); rpcResponse.setInvokeSuccess(input.readBoolean()); if(rpcResponse.isInvokeSuccess()) rpcResponse.setResult(kryo.readClassAndObject(input)); else rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input)); return rpcResponse; } }类似RpcRequestSerializer,不再赘述
4.测试
测试内容与下一节的Netty网络传输一同测试
相关文章推荐
- 损失函数(loss function) 转
- Gibbs Sampling(一):随机数产生方法介绍 & Monte Carlo方法
- windows下安装python和Django
- Delphi XE7,C++ Builder XE7,RAD Studio XE7 v21.0.17017.3725(With Update 1) 官方下载激活
- vsPhere,vCenter,ExiHost关系
- 如何解决Win7电脑不能正常安装软件的情况?Win7电脑不能正常安装软件的解决方法
- Leetcode: Best Time to Buy and Sell Stock IV
- Windows批处理学习之文件操作类命令
- HttpClient使用HttpGet进行json数据传输
- document.body 和 document.documentElement 的属性区别
- c++实现两个大整数相加(一)
- NoSuchMethodError: javax.servlet.ServletContext.getContextPath()Ljava/lang/String
- 场景美术经验
- lintcode-子数组之和-138
- 大数相乘
- linux进程管理工具二
- 让IE6 ~IE11支持Bootstrap的解决方法
- 蓝懿学习之 练习日
- POJ 2533 Longest Ordered Subsequence(DP最长上升子序列O(n^2)&&O(nlogn))
- Delphi XE6,C++ Builder XE6,RAD Studio XE6 v20.0.16277.1276(With Update 1) 官方下载激活