Avro 序列化操作原理与应用
2017-04-12 00:00
351 查看
摘要: hadoop-Avro笔记 ,文档中所示例的项目的地址 https://git.oschina.net/weiwei02/WHadoop
Avro的序列化不同于json,其更重视数据在网络上传输的性能,传输过程中忽略了数据的可读性。当然Avro也支持像json一样的key value传输。Avro提供了两种方式进行数据的序列化与反序列化,根据不同的使用场景可以任由我们选择。下面通过Avro使用通用和特殊两种API进行数据的序列化与反序列化的两个例子来探究其原理与应用。
首先以一个简单的Avro模式 StringPire.avsc为例
接着再创建一个 SerializeString.java 文件,用于执行指定模式的Avro的序列化与反序列化
对于上面的示例,测试用例如下:
经过测试,我们得到了以下结果:
用例全部正常通过,我们写入的两个字符串完全正常的被序列化与读取了.
对于通用API的写入与读取原理我们举例说明:
在这里我们发现由
Avro 实际使用了 9 个字节去表示,如果我们使用传统的json去表示区分,这将花费 34 个字节.在实际存储时,每个字段的数据独占一行(用ascii 码 10 作为区分)(--当数据中存在\n换行符时使用-- 用一页 ascii码 12 作为区分),读取时按照预订的模式所规定的数据类型按照模式声明的顺序去读,对于所需要的字段,可以传空值,但不可以不传.通用的反序列化的逻辑应该正好与序列化的逻辑相反,这里不再叙述.
例:使用maven插件根据 StringPair.avsc 生成 StringPair.java
pom.xml配置如下:
上面的配置将 avro:schema 插件绑定到 maven 官方的插件 generate-sources 命令之上.配置完成之后,在shell中运行
然后创建一个序列化StringPair对象的类 SerializeStringPair.java
通过实例可以发现,特定的api不再需要指定模式等内容,让编程变得更加简单。而其内部实现原理也只是使用反射进行映射,相对都变得更为简单。并且与通用API相同,其内部的字节组织都专门为网络传输做过优化。
Avro 序列化操作原理与应用
内存中的序列化与反序列化
Avro 提供了序列化与反序列化API,通过这些API我们可以很方便的将 Avro 集成到现有的系统.Avro的序列化不同于json,其更重视数据在网络上传输的性能,传输过程中忽略了数据的可读性。当然Avro也支持像json一样的key value传输。Avro提供了两种方式进行数据的序列化与反序列化,根据不同的使用场景可以任由我们选择。下面通过Avro使用通用和特殊两种API进行数据的序列化与反序列化的两个例子来探究其原理与应用。
通用 API
使用通用API我们可以不必创建相应的java对象,就像操作Map一样,只需要知道需要操作的数据的key和value.甚至可以对已有的schema进行分析动态的得出我们所需要的数据结构.首先以一个简单的Avro模式 StringPire.avsc为例
{ "type": "record", "name": "StringPair", "doc": "A pair of strings.", "fields": [ {"name": "left", "type": "string"}, {"name": "right", "type": "string"} ] }
接着再创建一个 SerializeString.java 文件,用于执行指定模式的Avro的序列化与反序列化
package cn.weiwei.WHadoop.Avro; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.*; import org.apache.avro.util.Utf8; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * <pre>字符序列化与反序列化</pre> * [@author](https://my.oschina.net/arthor) Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com> * [@version](https://my.oschina.net/u/931210) 1.0 * [@sine](https://my.oschina.net/mysine) 17-4-6 */ public class SerializeString { /** * 使用通用API序列化字符串 * */ public ByteArrayOutputStream serialize(String path) throws IOException { Schema schema = buildSchema(path); //创建通用Avro记录 GenericRecord genericRecord = new GenericData.Record(schema); genericRecord.put("left",new Utf8("LEFT")); genericRecord.put("right",new Utf8("RIGHT")); //将记录序列化到输出流中 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); //datumWriter对象将数据翻译成Encoder对象可以理解的类型然后写入到输出流 DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); //由于encord无需重用 binaryEncoder()的第二个参数为null 3ff0 Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream,null); datumWriter.write(genericRecord, encoder); encoder.flush(); byteArrayOutputStream.close(); System.out.println(byteArrayOutputStream.toString("utf-8")); return byteArrayOutputStream; } /** * 加载要使用的模式 * */ private Schema buildSchema(String path) throws IOException { //加载要使用的模式 Schema.Parser parser = new Schema.Parser(); return parser.parse(getClass().getClassLoader().getResourceAsStream(path)); } /**使用通用API反序列化数据*/ public void deserialize(String path) throws IOException { Schema schema = buildSchema(path); DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); Decoder decoder = DecoderFactory.get().binaryDecoder(serialize(path).toByteArray(),null); GenericRecord record = datumReader.read(null,decoder); System.out.println(record.toString()); } }
对于上面的示例,测试用例如下:
package cn.weiwei.WHadoop.Avro; import org.junit.Test; import static org.junit.Assert.*; /**<pre>测试字符串序列化与反序列化</pre> * @author Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com> * @version 1.0 * @sine 17-4-6 */ public class SerializeStringTest { SerializeString serializeString = new SerializeString(); @Test public void serialize() throws Exception { System.out.println(serializeString.serialize("StringPair.avsc").toString("utf-8")); } @Test public void deserialize() throws Exception { serializeString.deserialize("StringPair.avsc"); } }
经过测试,我们得到了以下结果:
LEFT RIGHT {"left": "LEFT", "right": "RIGHT"} Process finished with exit code 0
用例全部正常通过,我们写入的两个字符串完全正常的被序列化与读取了.
对于通用API的写入与读取原理我们举例说明:
/**重载调用write(Schema schema, Object datum, Encoder out)方法*/ public void write(D datum, Encoder out) throws IOException { write(root, datum, out); } /** 写入数据的主方法.*/ protected void write(Schema schema, Object datum, Encoder out) throws IOException { try { switch (schema.getType()) { case RECORD: writeRecord(schema, datum, out); break; case ENUM: writeEnum(schema, datum, out); break; case ARRAY: writeArray(schema, datum, out); break; case MAP: writeMap(schema, datum, out); break; case UNION: int index = resolveUnion(schema, datum); out.writeIndex(index); write(schema.getTypes().get(index), datum, out); break; case FIXED: writeFixed(schema, datum, out); break; case STRING: writeString(schema, datum, out); break; case BYTES: writeBytes(datum, out); break; case INT: out.writeInt(((Number)datum).intValue()); break; case LONG: out.writeLong((Long)datum); break; case FLOAT: out.writeFloat((Float)datum); break; case DOUBLE: out.writeDouble((Double)datum); break; case BOOLEAN: out.writeBoolean((Boolean)datum); break; case NULL: out.writeNull(); break; default: error(schema,datum); } } catch (NullPointerException e) { throw npe(e, " of "+schema.getFullName()); } }
void write(Schema schema, Object datum, Encoder out)方法会根据不同的模式标注的类型作出不同的反映,如果是基本数据类型的话会直接写入到序列化输出流中,否则继续进行其它处理,我们以 schema的type为record为例继续分析:
/** Called to write a record. May be overridden for alternate record * representations.*/ protected void writeRecord(Schema schema, Object datum, Encoder out) throws IOException { Object state = data.getRecordState(datum, schema); for (Field f : schema.getFields()) { writeField(datum, f, out, state); } } /** Called to write a single field of a record. May be overridden for more * efficient or alternate implementations.*/ protected void writeField(Object datum, Field f, Encoder out, Object state) throws IOException { Object value = data.getField(datum, f.name(), f.pos(), state); try { write(f.schema(), value, out); } catch (NullPointerException e) { throw npe(e, " in field " + f.name()); } }
在这里我们发现由
write()方法调用
writeRecord()方法 调用
writeField()再递归调用
write()方法,这样就能无所遗漏,经由选择-循环-递归的复杂逻辑处理,保证不漏掉,不错过某个字段的数据,以完成整个数据中所有的模式的解析.并且由于 Avro 写入到流中的信息只有数据本身,而没有像 JSON 那样将冗余的数据模式也同时写入到数据中,这样更利于在网络中高效传输数据.利于对于上面所序列化的字符串
LEFT RIGHT
Avro 实际使用了 9 个字节去表示,如果我们使用传统的json去表示区分,这将花费 34 个字节.在实际存储时,每个字段的数据独占一行(用ascii 码 10 作为区分)(--当数据中存在\n换行符时使用-- 用一页 ascii码 12 作为区分),读取时按照预订的模式所规定的数据类型按照模式声明的顺序去读,对于所需要的字段,可以传空值,但不可以不传.通用的反序列化的逻辑应该正好与序列化的逻辑相反,这里不再叙述.
特殊 API
我们可以使用上面通用的API进行序列化与反序列化操作,但这种方式好像不太符合我们的一切事物皆是对象的思想.Avro 提供的API也有对特殊对象进行操作的API,并且还提供了根据特定的模式去生成类的Maven插件.例:使用maven插件根据 StringPair.avsc 生成 StringPair.java
pom.xml配置如下:
<build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> <executions> <execution> <id>schemas</id> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <includes> <include>StringPair.avsc</include> </includes> <stringType>String</stringType> <!-- Avro 1.6.0 onwards --> <sourceDirectory>src/main/resources</sourceDirectory> <outputDirectory>${basedir}/src/main/java/cn/weiwei/WHadoop/Avro/domain </outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
上面的配置将 avro:schema 插件绑定到 maven 官方的插件 generate-sources 命令之上.配置完成之后,在shell中运行
mvn generate-sources命令便可生成文件 StringPair.java
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ @SuppressWarnings("all") /** A pair of strings. */ @org.apache.avro.specific.AvroGenerated public class StringPair extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"StringPair\",\"doc\":\"A pair of strings.\",\"fields\":[{\"name\":\"left\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"right\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"num\",\"type\":\"int\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public java.lang.String left; @Deprecated public java.lang.String right; @Deprecated public int num; /** * Default constructor. Note that this does not initialize fields * to their default values from the schema. If that is desired then * one should use <code>newBuilder()</code>. */ public StringPair() {} /** * All-args constructor. */ public StringPair(java.lang.String left, java.lang.String right, java.lang.Integer num) { this.left = left; this.right = right; this.num = num; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(int field$) { switch (field$) { case 0: return left; case 1: return right; case 2: return num; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } // Used by DatumReader. Applications should not call. @SuppressWarnings(value="unchecked") public void put(int field$, java.lang.Object value$) { switch (field$) { case 0: left = (java.lang.String)value$; break; case 1: right = (java.lang.String)value$; break; case 2: num = (java.lang.Integer)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } /** * Gets the value of the 'left' field. */ public java.lang.String getLeft() { return left; } /** * Sets the value of the 'left' field. * @param value the value to set. */ public void setLeft(java.lang.String value) { this.left = value; } /** * Gets the value of the 'right' field. */ public java.lang.String getRight() { return right; } /** * Sets the value of the 'right' field. * @param value the value to set. */ public void setRight(java.lang.String value) { this.right = value; } /** * Gets the value of the 'num' field. */ public java.lang.Integer getNum() { return num; } /** * Sets the value of the 'num' field. * @param value the value to set. */ public void setNum(java.lang.Integer value) { this.num = value; } /** Creates a new StringPair RecordBuilder */ public static StringPair.Builder newBuilder() { return new StringPair.Builder(); } /** Creates a new StringPair RecordBuilder by copying an existing Builder */ public static StringPair.Builder newBuilder(StringPair.Builder other) { return new StringPair.Builder(other); } /** Creates a new StringPair RecordBuilder by copying an existing StringPair instance */ public static StringPair.Builder newBuilder(StringPair other) { return new StringPair.Builder(other); } /** * RecordBuilder for StringPair instances. */ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<StringPair> implements org.apache.avro.data.RecordBuilder<StringPair> { private java.lang.String left; private java.lang.String right; private int num; /** Creates a new Builder */ private Builder() { super(StringPair.SCHEMA$); } /** Creates a Builder by copying an existing Builder */ private Builder(StringPair.Builder other) { super(other); if (isValidValue(fields()[0], other.left)) { this.left = data().deepCopy(fields()[0].schema(), other.left); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.right)) { this.right = data().deepCopy(fields()[1].schema(), other.right); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.num)) { this.num = data().deepCopy(fields()[2].schema(), other.num); fieldSetFlags()[2] = true; } } /** Creates a Builder by copying an existing StringPair instance */ private Builder(StringPair other) { super(StringPair.SCHEMA$); if (isValidValue(fields()[0], other.left)) { this.left = data().deepCopy(fields()[0].schema(), other.left); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.right)) { this.right = data().deepCopy(fields()[1].schema(), other.right); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.num)) { this.num = data().deepCopy(fields()[2].schema(), other.num); fieldSetFlags()[2] = true; } } /** Gets the value of the 'left' field */ public java.lang.String getLeft() { return left; } /** Sets the value of the 'left' field */ public StringPair.Builder setLeft(java.lang.String value) { validate(fields()[0], value); this.left = value; fieldSetFlags()[0] = true; return this; } /** Checks whether the 'left' field has been set */ public boolean hasLeft() { return fieldSetFlags()[0]; } /** Clears the value of the 'left' field */ public StringPair.Builder clearLeft() { left = null; fieldSetFlags()[0] = false; return this; } /** Gets the value of the 'right' field */ public java.lang.String getRight() { return right; } /** Sets the value of the 'right' field */ public StringPair.Builder setRight(java.lang.String value) { validate(fields()[1], value); this.right = value; fieldSetFlags()[1] = true; return this; } /** Checks whether the 'right' field has been set */ public boolean hasRight() { return fieldSetFlags()[1]; } /** Clears the value of the 'right' field */ public StringPair.Builder clearRight() { right = null; fieldSetFlags()[1] = f 3ff0 alse; return this; } /** Gets the value of the 'num' field */ public java.lang.Integer getNum() { return num; } /** Sets the value of the 'num' field */ public StringPair.Builder setNum(int value) { validate(fields()[2], value); this.num = value; fieldSetFlags()[2] = true; return this; } /** Checks whether the 'num' field has been set */ public boolean hasNum() { return fieldSetFlags()[2]; } /** Clears the value of the 'num' field */ public StringPair.Builder clearNum() { fieldSetFlags()[2] = false; return this; } @Override public StringPair build() { try { StringPair record = new StringPair(); record.left = fieldSetFlags()[0] ? this.left : (java.lang.String) defaultValue(fields()[0]); record.right = fieldSetFlags()[1] ? this.right : (java.lang.String) defaultValue(fields()[1]); record.num = fieldSetFlags()[2] ? this.num : (java.lang.Integer) defaultValue(fields()[2]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); } } } }
然后创建一个序列化StringPair对象的类 SerializeStringPair.java
package cn.weiwei.WHadoop.Avro; import cn.weiwei.WHadoop.Avro.domain.StringPair; import org.apache.avro.io.*; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * <pre>序列化与反序列化 StringPair 工具类</pre> * @author Wang Weiwei <email>weiwei02@vip.qq.com / weiwei.wang@100credit.com</email> * @version 1.0 * @sine 2017/4/12 */ public class SerializeStringPair { public StringPair stringPair; public SerializeStringPair(){ stringPair = new StringPair(); stringPair.setLeft("L"); stringPair.setRight("R"); } /**序列化*/ public byte[] serialize() throws IOException { ByteArrayOutputStream stream = new ByteArrayOutputStream(); DatumWriter<StringPair> stringPairDatumWriter = new SpecificDatumWriter<StringPair>(StringPair.class); Encoder encoder = EncoderFactory.get().binaryEncoder(stream,null); stringPairDatumWriter.write(stringPair,encoder); encoder.flush(); stream.close(); return stream.toByteArray(); } /** * 反序列化 * */ public void deSerialize(byte[] data) throws IOException { DatumReader<StringPair> stringPairDatumReader = new SpecificDatumReader<StringPair>(StringPair.class); Decoder decoder = DecoderFactory.get().binaryDecoder(data,null); StringPair stringPair = stringPairDatumReader.read(null,decoder); System.out.println(stringPair); } }
通过实例可以发现,特定的api不再需要指定模式等内容,让编程变得更加简单。而其内部实现原理也只是使用反射进行映射,相对都变得更为简单。并且与通用API相同,其内部的字节组织都专门为网络传输做过优化。
相关文章推荐
- Avro序列化操作(1):环境搭建和Schema处理
- Avro序列化操作(2):序列化和反序列化
- Redis进阶实践(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- javascript利用控件对windows的操作实现原理与应用
- 文章标题 Java中io流的一些简单操作(包含文件复制,向硬盘中写入文本文件,以及io流高级应用序列化和反序列化)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 转载_如何像巫师那样隔空操作——聊聊迷你雷达的原理和应用
- GCC数值原子操作API原理及应用
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 7.zookeeper原理解析-序列化之底层通信数据封装与操作
- avro序列化详细操作
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- Zookeeper系列(二十二)Zookeeper原理解析之序列化之底层通信数据封装与操作和Record接口
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- javascript利用控件对windows的操作实现原理与应用
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)