您的位置:首页 > 其它

基于Netty的RPC简单框架实现(三):Kryo实现序列化

2015-09-12 14:05 1131 查看
[b][b]1.[/b]序列[/b]化和反序列化

网络中都是以字节序列的形式来传输数据的,因此在发送消息时需要先将对象序列化转换为字节序列,然后将获得的字节序列发送出去,消息接收方接收到字节序列后将之反序列化获得传输的对象,从收发双方来看就如同直接发送和接收了对象一样。

2.第三方依赖

本例使用目前最新版的kryo-serializers 0.36用于序列化

<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.36</version>
</dependency>
使用maven直接在pom.xml中添加上面的依赖即可

3.序列化和反序列化的实现

序列化和反序列化的对象在本例中只有两种:1.客户端向服务端发出的调用请求RpcRequest 2.服务端向客户端返回的调用结果RpcResponse

(1).RpcResponse

从RPC服务端传回给客户端的某次调用请求的结果

package com.maigo.rpc.context;

public class RpcResponse
{
private int id;
private Object result;
private Throwable throwable;
private boolean isInvokeSuccess;

public int getId()
{
return id;
}

public void setId(int id)
{
this.id = id;
}

public Object getResult()
{
return result;
}

public void setResult(Object result)
{
this.result = result;
}

public Throwable getThrowable()
{
return throwable;
}

public void setThrowable(Throwable throwable)
{
this.throwable = throwable;
}

public boolean isInvokeSuccess()
{
return isInvokeSuccess;
}

public void setInvokeSuccess(boolean isInvokeSuccess)
{
this.isInvokeSuccess = isInvokeSuccess;
}

}
RpcResponse中的id对应着该次请求的RpcRequest中的id,isInvokeSuccess表示调用中是否有异常抛出,result和throwable分别表示调用结果和调用过程抛出的异常。

(2).RpcRequest

已在上一节中给出

(3).KryoSerializer

实际负责序列化和反序列化

package com.maigo.rpc.serializer;

import java.io.IOException;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;

public class KryoSerializer
{
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

public static void serialize(Object object, ByteBuf byteBuf)
{
Kryo kryo = KryoHolder.get();
int startIdx = byteBuf.writerIndex();
ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf);
try
{
byteOutputStream.write(LENGTH_PLACEHOLDER);
Output output = new Output(1024*4, -1);
output.setOutputStream(byteOutputStream);
kryo.writeClassAndObject(output, object);

output.flush();
output.close();

int endIdx = byteBuf.writerIndex();

byteBuf.setInt(startIdx, endIdx - startIdx - 4);
}
catch (IOException e)
{
e.printStackTrace();
}
}

public static Object deserialize(ByteBuf byteBuf)
{
if(byteBuf == null)
return null;

Input input = new Input(new ByteBufInputStream(byteBuf));
Kryo kryo = KryoHolder.get();
return kryo.readClassAndObject(input);
}
}
serialize()将一个对象通过kryo序列化并写入ByteBuf中,注意到在头部预留了4个字节用于写入长度信息。deserialize()将ByteBuf中的内容反序列化还原出传输的对象。其中序列化和反序列化均用到了kryo对象,该对象是从KryoHolder中通过get()拿到的。

(4).KryoHolder

由于kryo对象是线程不安全的,当有多个netty的channel同时连接时,各channel是可能工作在不同的线程上的(netty中一个IO线程可以对应多个channel,而一个channel只能对应一个线程,详细可以参考netty线程模型),若共用同一个kryo对象会出现并发问题,因此用ThreadLocal在每个线程保留一个各自的kryo对象,保证不会大量创建kryo对象的同时避免了并发问题

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;

public class KryoHolder
{
private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>()
{
protected Kryo initialValue()
{
Kryo kryo = new KryoReflectionFactory();

return kryo;
};
};

public static Kryo get()
{
return threadLocalKryo.get();
}
}
可见,最终用于序列化和反序列化的kryo对象是通过new KryoReflectionFactory()创建的。

(5).KryoReflectionFactory

package com.maigo.rpc.serializer;

import java.lang.reflect.InvocationHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;

import com.esotericsoftware.kryo.Serializer;
import com.maigo.rpc.context.RpcRequest;
import com.maigo.rpc.context.RpcResponse;

import de.javakaffee.kryoserializers.ArraysAsListSerializer;
import de.javakaffee.kryoserializers.BitSetSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer;
import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer;
import de.javakaffee.kryoserializers.CopyForIterateMapSerializer;
import de.javakaffee.kryoserializers.DateSerializer;
import de.javakaffee.kryoserializers.EnumMapSerializer;
import de.javakaffee.kryoserializers.EnumSetSerializer;
import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
import de.javakaffee.kryoserializers.JdkProxySerializer;
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport;
import de.javakaffee.kryoserializers.RegexSerializer;
import de.javakaffee.kryoserializers.SubListSerializers;
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
import de.javakaffee.kryoserializers.URISerializer;
import de.javakaffee.kryoserializers.UUIDSerializer;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;

public class KryoReflectionFactory extends KryoReflectionFactorySupport
{
public KryoReflectionFactory()
{
setRegistrationRequired(false);
setReferences(true);
register(RpcRequest.class, new RpcRequestSerializer());
register(RpcResponse.class, new RpcResponseSerializer());
register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
register(Pattern.class, new RegexSerializer());
register(BitSet.class, new BitSetSerializer());
register(URI.class, new URISerializer());
register(UUID.class, new UUIDSerializer());
register(GregorianCalendar.class, new GregorianCalendarSerializer());
register(InvocationHandler.class, new JdkProxySerializer());
UnmodifiableCollectionsSerializer.registerSerializers(this);
SynchronizedCollectionsSerializer.registerSerializers(this);
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Serializer<?> getDefaultSerializer(Class clazz)
{
if(EnumSet.class.isAssignableFrom(clazz))
return new EnumSetSerializer();

if(EnumMap.class.isAssignableFrom(clazz))
return new EnumMapSerializer();

if(Collection.class.isAssignableFrom(clazz))
return new CopyForIterateCollectionSerializer();

if(Map.class.isAssignableFrom(clazz))
return new CopyForIterateMapSerializer();

if(Date.class.isAssignableFrom(clazz))
return new DateSerializer( clazz );

if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz)
|| SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz))
return SubListSerializers.createFor(clazz);

return super.getDefaultSerializer(clazz);
}
}
导入的包非常多,主要完成的功能是给大量类类型注册其对应的Serializer。setRegistrationRequired()设置是否只能序列化已注册的类,此处必须设置为false,因为RPC请求和回应中都可能包含用户自定义的类,这些类显然是不可能在kryo中注册过的。setReferences()若设置成false在序列化Exception时似乎有问题,此处维持打开(默认也是打开)。注意到给RpcRequest.class和RpcResponse.class分别注册了对应的Serializer为RpcRequestSerializer和RpcResponseSerializer。这是由于kryo对未注册的类序列化后的格式是

x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>
里面包含类的全类名,导致序列化后的字节序列很长,故应该实现一个自定义的Serializer用于已知类型的序列化和反序列化缩短序列化后的字节序列。

(6).RpcRequestSerializer

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcRequest;

public class RpcRequestSerializer extends Serializer<RpcRequest>
{
@Override
public void write(Kryo kryo, Output output, RpcRequest object)
{
output.writeInt(object.getId());
output.writeByte(object.getMethodName().length());
output.write(object.getMethodName().getBytes());
kryo.writeClassAndObject(output, object.getArgs());
}

@Override
public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type)
{
RpcRequest rpcRequest = null;
int id = input.readInt();
byte methodLength = input.readByte();
byte[] methodBytes = input.readBytes(methodLength);
String methodName = new String(methodBytes);
Object[] args = (Object[])kryo.readClassAndObject(input);

rpcRequest = new RpcRequest(id, methodName, args);

return rpcRequest;
}
}
write()中按顺序往output中写入id,调用方法名的长度和调用方法名的字节数组,最后是调用方法的参数列表,由于不知道参数的确切类型,此处调用传进的kryo对象的writeClassAndObject()方法对参数进行序列化。

read()中按照相同的顺序读出值并根据这些值构建出一个RpcRequest对象并返回。

(7).RpcResponseSerializer

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcResponse;

public class RpcResponseSerializer extends Serializer<RpcResponse>
{
@Override
public void write(Kryo kryo, Output output, RpcResponse object)
{
output.writeInt(object.getId());
output.writeBoolean(object.isInvokeSuccess());
if(object.isInvokeSuccess())
kryo.writeClassAndObject(output, object.getResult());
else
kryo.writeClassAndObject(output, object.getThrowable());
}

@Override
public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type)
{
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setId(input.readInt());
rpcResponse.setInvokeSuccess(input.readBoolean());
if(rpcResponse.isInvokeSuccess())
rpcResponse.setResult(kryo.readClassAndObject(input));
else
rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input));

return rpcResponse;
}
}
类似RpcRequestSerializer,不再赘述

4.测试

测试内容与下一节的Netty网络传输一同测试
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: