您的位置:首页 > 理论基础 > 计算机网络

Avro 序列化操作原理与应用

2017-04-12 00:00 351 查看
摘要: hadoop-Avro笔记 ,文档中所示例的项目的地址 https://git.oschina.net/weiwei02/WHadoop

Avro 序列化操作原理与应用

内存中的序列化与反序列化

Avro 提供了序列化与反序列化API,通过这些API我们可以很方便的将 Avro 集成到现有的系统.
Avro的序列化不同于json,其更重视数据在网络上传输的性能,传输过程中忽略了数据的可读性。当然Avro也支持像json一样的key value传输。Avro提供了两种方式进行数据的序列化与反序列化,根据不同的使用场景可以任由我们选择。下面通过Avro使用通用和特殊两种API进行数据的序列化与反序列化的两个例子来探究其原理与应用。

通用 API

 使用通用API我们可以不必创建相应的java对象,就像操作Map一样,只需要知道需要操作的数据的key和value.甚至可以对已有的schema进行分析动态的得出我们所需要的数据结构.

首先以一个简单的Avro模式 StringPire.avsc为例

{
"type": "record",
"name": "StringPair",
"doc": "A pair of strings.",
"fields": [
{"name": "left", "type": "string"},
{"name": "right", "type": "string"}
]
}

接着再创建一个 SerializeString.java 文件,用于执行指定模式的Avro的序列化与反序列化

package cn.weiwei.WHadoop.Avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.util.Utf8;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* <pre>字符序列化与反序列化</pre>
* [@author](https://my.oschina.net/arthor) Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com>
* [@version](https://my.oschina.net/u/931210) 1.0
* [@sine](https://my.oschina.net/mysine) 17-4-6
*/
public class SerializeString {
/**
* 使用通用API序列化字符串
* */
public ByteArrayOutputStream serialize(String path) throws IOException {
Schema schema = buildSchema(path);

//创建通用Avro记录
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("left",new Utf8("LEFT"));
genericRecord.put("right",new Utf8("RIGHT"));

//将记录序列化到输出流中
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
//datumWriter对象将数据翻译成Encoder对象可以理解的类型然后写入到输出流
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
//由于encord无需重用 binaryEncoder()的第二个参数为null

3ff0
Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream,null);
datumWriter.write(genericRecord, encoder);
encoder.flush();
byteArrayOutputStream.close();
System.out.println(byteArrayOutputStream.toString("utf-8"));
return byteArrayOutputStream;
}

/**
* 加载要使用的模式
* */
private Schema buildSchema(String path) throws IOException {
//加载要使用的模式
Schema.Parser parser = new Schema.Parser();
return parser.parse(getClass().getClassLoader().getResourceAsStream(path));
}

/**使用通用API反序列化数据*/
public void deserialize(String path) throws IOException {
Schema schema = buildSchema(path);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(serialize(path).toByteArray(),null);
GenericRecord record = datumReader.read(null,decoder);
System.out.println(record.toString());
}

}

对于上面的示例,测试用例如下:

package cn.weiwei.WHadoop.Avro;

import org.junit.Test;

import static org.junit.Assert.*;

/**<pre>测试字符串序列化与反序列化</pre>
* @author Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com>
* @version 1.0
* @sine 17-4-6
*/
public class SerializeStringTest {
SerializeString serializeString = new SerializeString();
@Test
public void serialize() throws Exception {
System.out.println(serializeString.serialize("StringPair.avsc").toString("utf-8"));
}

@Test
public void deserialize() throws Exception {
serializeString.deserialize("StringPair.avsc");
}

}

经过测试,我们得到了以下结果:

LEFT
RIGHT

{"left": "LEFT", "right": "RIGHT"}

Process finished with exit code 0

用例全部正常通过,我们写入的两个字符串完全正常的被序列化与读取了.
对于通用API的写入与读取原理我们举例说明:

/**重载调用write(Schema schema, Object datum, Encoder out)方法*/
public void write(D datum, Encoder out) throws IOException {
write(root, datum, out);
}

/** 写入数据的主方法.*/
protected void write(Schema schema, Object datum, Encoder out)
throws IOException {
try {
switch (schema.getType()) {
case RECORD: writeRecord(schema, datum, out); break;
case ENUM:   writeEnum(schema, datum, out);   break;
case ARRAY:  writeArray(schema, datum, out);  break;
case MAP:    writeMap(schema, datum, out);    break;
case UNION:
int index = resolveUnion(schema, datum);
out.writeIndex(index);
write(schema.getTypes().get(index), datum, out);
break;
case FIXED:   writeFixed(schema, datum, out);   break;
case STRING:  writeString(schema, datum, out);  break;
case BYTES:   writeBytes(datum, out);           break;
case INT:     out.writeInt(((Number)datum).intValue()); break;
case LONG:    out.writeLong((Long)datum);       break;
case FLOAT:   out.writeFloat((Float)datum);     break;
case DOUBLE:  out.writeDouble((Double)datum);   break;
case BOOLEAN: out.writeBoolean((Boolean)datum); break;
case NULL:    out.writeNull();                  break;
default: error(schema,datum);
}
} catch (NullPointerException e) {
throw npe(e, " of "+schema.getFullName());
}
}

void write(Schema schema, Object datum, Encoder out)
方法会根据不同的模式标注的类型作出不同的反映,如果是基本数据类型的话会直接写入到序列化输出流中,否则继续进行其它处理,我们以 schema的type为record为例继续分析:

/** Called to write a record.  May be overridden for alternate record
* representations.*/
protected void writeRecord(Schema schema, Object datum, Encoder out)
throws IOException {
Object state = data.getRecordState(datum, schema);
for (Field f : schema.getFields()) {
writeField(datum, f, out, state);
}
}

/** Called to write a single field of a record. May be overridden for more
* efficient or alternate implementations.*/
protected void writeField(Object datum, Field f, Encoder out, Object state)
throws IOException {
Object value = data.getField(datum, f.name(), f.pos(), state);
try {
write(f.schema(), value, out);
} catch (NullPointerException e) {
throw npe(e, " in field " + f.name());
}
}

在这里我们发现由
write()
 方法调用 
writeRecord()
 方法 调用
writeField()
再递归调用
write()
 方法,这样就能无所遗漏,经由选择-循环-递归的复杂逻辑处理,保证不漏掉,不错过某个字段的数据,以完成整个数据中所有的模式的解析.并且由于 Avro 写入到流中的信息只有数据本身,而没有像 JSON 那样将冗余的数据模式也同时写入到数据中,这样更利于在网络中高效传输数据.利于对于上面所序列化的字符串

LEFT
RIGHT

Avro 实际使用了 9 个字节去表示,如果我们使用传统的json去表示区分,这将花费 34 个字节.在实际存储时,每个字段的数据独占一行(用ascii 码 10 作为区分)(--当数据中存在\n换行符时使用-- 用一页  ascii码 12 作为区分),读取时按照预订的模式所规定的数据类型按照模式声明的顺序去读,对于所需要的字段,可以传空值,但不可以不传.通用的反序列化的逻辑应该正好与序列化的逻辑相反,这里不再叙述.

特殊 API

我们可以使用上面通用的API进行序列化与反序列化操作,但这种方式好像不太符合我们的一切事物皆是对象的思想.Avro 提供的API也有对特殊对象进行操作的API,并且还提供了根据特定的模式去生成类的Maven插件.

例:使用maven插件根据 StringPair.avsc 生成 StringPair.java

pom.xml配置如下:

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<includes>
<include>StringPair.avsc</include>
</includes>
<stringType>String</stringType> <!-- Avro 1.6.0 onwards -->
<sourceDirectory>src/main/resources</sourceDirectory>
<outputDirectory>${basedir}/src/main/java/cn/weiwei/WHadoop/Avro/domain
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

上面的配置将 avro:schema 插件绑定到 maven 官方的插件 generate-sources 命令之上.配置完成之后,在shell中运行 
mvn generate-sources
命令便可生成文件 StringPair.java

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
/** A pair of strings. */
@org.apache.avro.specific.AvroGenerated
public class StringPair extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"StringPair\",\"doc\":\"A pair of strings.\",\"fields\":[{\"name\":\"left\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"right\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"num\",\"type\":\"int\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.lang.String left;
@Deprecated public java.lang.String right;
@Deprecated public int num;

/**
* Default constructor.  Note that this does not initialize fields
* to their default values from the schema.  If that is desired then
* one should use <code>newBuilder()</code>.
*/
public StringPair() {}

/**
* All-args constructor.
*/
public StringPair(java.lang.String left, java.lang.String right, java.lang.Integer num) {
this.left = left;
this.right = right;
this.num = num;
}

public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter.  Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return left;
case 1: return right;
case 2: return num;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader.  Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: left = (java.lang.String)value$; break;
case 1: right = (java.lang.String)value$; break;
case 2: num = (java.lang.Integer)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

/**
* Gets the value of the 'left' field.
*/
public java.lang.String getLeft() {
return left;
}

/**
* Sets the value of the 'left' field.
* @param value the value to set.
*/
public void setLeft(java.lang.String value) {
this.left = value;
}

/**
* Gets the value of the 'right' field.
*/
public java.lang.String getRight() {
return right;
}

/**
* Sets the value of the 'right' field.
* @param value the value to set.
*/
public void setRight(java.lang.String value) {
this.right = value;
}

/**
* Gets the value of the 'num' field.
*/
public java.lang.Integer getNum() {
return num;
}

/**
* Sets the value of the 'num' field.
* @param value the value to set.
*/
public void setNum(java.lang.Integer value) {
this.num = value;
}

/** Creates a new StringPair RecordBuilder */
public static StringPair.Builder newBuilder() {
return new StringPair.Builder();
}

/** Creates a new StringPair RecordBuilder by copying an existing Builder */
public static StringPair.Builder newBuilder(StringPair.Builder other) {
return new StringPair.Builder(other);
}

/** Creates a new StringPair RecordBuilder by copying an existing StringPair instance */
public static StringPair.Builder newBuilder(StringPair other) {
return new StringPair.Builder(other);
}

/**
* RecordBuilder for StringPair instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<StringPair>
implements org.apache.avro.data.RecordBuilder<StringPair> {

private java.lang.String left;
private java.lang.String right;
private int num;

/** Creates a new Builder */
private Builder() {
super(StringPair.SCHEMA$);
}

/** Creates a Builder by copying an existing Builder */
private Builder(StringPair.Builder other) {
super(other);
if (isValidValue(fields()[0], other.left)) {
this.left = data().deepCopy(fields()[0].schema(), other.left);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.right)) {
this.right = data().deepCopy(fields()[1].schema(), other.right);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.num)) {
this.num = data().deepCopy(fields()[2].schema(), other.num);
fieldSetFlags()[2] = true;
}
}

/** Creates a Builder by copying an existing StringPair instance */
private Builder(StringPair other) {
super(StringPair.SCHEMA$);
if (isValidValue(fields()[0], other.left)) {
this.left = data().deepCopy(fields()[0].schema(), other.left);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.right)) {
this.right = data().deepCopy(fields()[1].schema(), other.right);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.num)) {
this.num = data().deepCopy(fields()[2].schema(), other.num);
fieldSetFlags()[2] = true;
}
}

/** Gets the value of the 'left' field */
public java.lang.String getLeft() {
return left;
}

/** Sets the value of the 'left' field */
public StringPair.Builder setLeft(java.lang.String value) {
validate(fields()[0], value);
this.left = value;
fieldSetFlags()[0] = true;
return this;
}

/** Checks whether the 'left' field has been set */
public boolean hasLeft() {
return fieldSetFlags()[0];
}

/** Clears the value of the 'left' field */
public StringPair.Builder clearLeft() {
left = null;
fieldSetFlags()[0] = false;
return this;
}

/** Gets the value of the 'right' field */
public java.lang.String getRight() {
return right;
}

/** Sets the value of the 'right' field */
public StringPair.Builder setRight(java.lang.String value) {
validate(fields()[1], value);
this.right = value;
fieldSetFlags()[1] = true;
return this;
}

/** Checks whether the 'right' field has been set */
public boolean hasRight() {
return fieldSetFlags()[1];
}

/** Clears the value of the 'right' field */
public StringPair.Builder clearRight() {
right = null;
fieldSetFlags()[1] = f
3ff0
alse;
return this;
}

/** Gets the value of the 'num' field */
public java.lang.Integer getNum() {
return num;
}

/** Sets the value of the 'num' field */
public StringPair.Builder setNum(int value) {
validate(fields()[2], value);
this.num = value;
fieldSetFlags()[2] = true;
return this;
}

/** Checks whether the 'num' field has been set */
public boolean hasNum() {
return fieldSetFlags()[2];
}

/** Clears the value of the 'num' field */
public StringPair.Builder clearNum() {
fieldSetFlags()[2] = false;
return this;
}

@Override
public StringPair build() {
try {
StringPair record = new StringPair();
record.left = fieldSetFlags()[0] ? this.left : (java.lang.String) defaultValue(fields()[0]);
record.right = fieldSetFlags()[1] ? this.right : (java.lang.String) defaultValue(fields()[1]);
record.num = fieldSetFlags()[2] ? this.num : (java.lang.Integer) defaultValue(fields()[2]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}

然后创建一个序列化StringPair对象的类 SerializeStringPair.java

package cn.weiwei.WHadoop.Avro;

import cn.weiwei.WHadoop.Avro.domain.StringPair;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* <pre>序列化与反序列化 StringPair 工具类</pre>
* @author Wang Weiwei <email>weiwei02@vip.qq.com / weiwei.wang@100credit.com</email>
* @version 1.0
* @sine 2017/4/12
*/
public class SerializeStringPair {
public StringPair stringPair;

public SerializeStringPair(){
stringPair = new StringPair();
stringPair.setLeft("L");
stringPair.setRight("R");
}

/**序列化*/
public byte[] serialize() throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DatumWriter<StringPair> stringPairDatumWriter = new SpecificDatumWriter<StringPair>(StringPair.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(stream,null);
stringPairDatumWriter.write(stringPair,encoder);
encoder.flush();
stream.close();
return stream.toByteArray();
}

/**
* 反序列化
* */
public void deSerialize(byte[] data) throws IOException {
DatumReader<StringPair> stringPairDatumReader = new SpecificDatumReader<StringPair>(StringPair.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data,null);
StringPair stringPair = stringPairDatumReader.read(null,decoder);
System.out.println(stringPair);
}

}

通过实例可以发现,特定的api不再需要指定模式等内容,让编程变得更加简单。而其内部实现原理也只是使用反射进行映射,相对都变得更为简单。并且与通用API相同,其内部的字节组织都专门为网络传输做过优化。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐