您的位置:首页 > 运维架构

Hadoop(十一)Hadoop IO之序列化与比较功能实现详解【转载】

2017-11-24 15:38 447 查看
阅读目录(Content)

一、序列化和反序列化概述

1.1、序列化和反序列化的定义

1.2、序列化和反序列化的应用

1.3、RPC序列化格式要求

二、Hadoop中和虚序列化相关的接口和类

1.1、Hadoop对基本数据类型的包装

1.2、Writable接口

1.3、实例解释Java和Hadoop数据类型序列化的差别

1.4、在Hadoop中写一个序列化的类

四、Hadoop中和比较相关的接口和类

4.1、WritableComparable接口

4.2、RawComparator接口

4.3、WritableComparator类

五、Hadoop实现序列化和比较功能

5.1、核心代码

前言

  上一篇给大家介绍了Hadoop是怎么样保证数据的完整性的,并且使用Java程序来验证了会产生.crc的校验文件。这一篇给大家分享的是Hadoop的序列化!

一、序列化和反序列化概述

1.1、序列化和反序列化的定义

  1)序列化:将结构化对象转换为字节流的过程,以便在网络上传输或写入到磁盘进行永久存储的过程。
  2)反序列化:将字节流转回一系列的相反过程结构化对象。

  注意:其实流就是字节数组,我们把数据转变成一系列的字节数组(0101这样的数据)

1.2、序列化和反序列化的应用

  1)进程间的通信

  2)持久化存储

1.3、RPC序列化格式要求

  在Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用(RPC)”实现的。RPC协议将消息序列化成 二进制流后发送到远程节点,远程节点

  将二进制流反序列化为原始信息。通常情况下,RPC序列化格式如下:

    1)紧凑(compact)

      紧凑格式能充分利用网络带宽。

    2)快速(Fast)

      进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本..最基本的。

    3)可扩展(Extensible)

      为了满足新的需求,协议不断变化。所以控制客户端和服务器的过程中,需要直接引进相应的协议。

    4)支持互操作(Interoperable)

      对于某些系统来说,希望能支持以不同语言写的客户端与服务器交互,所以需要设计需要一种特定的格式来满足这一需求。

二、Hadoop中和虚序列化相关的接口和类

  在Java中将一个类写为可以序列化的类是实现Serializable接口

  在Hadoop中将一个类写为可以序列化的类是实现Writable接口,它是一个最顶级的接口。

1.1、Hadoop对基本数据类型的包装

  Hadoop参照JDK里面的数据类型实现了自己的数据类型,Hadoop自己实现的原理会使数据更紧凑一些,效率会高一些。序列化之后的字节数组大小会比

  JDK序列化出来的更小一些。

  所有Java基本类型的可写包装器,除了char(可以是存储在IntWritable中)。所有的都有一个get()和set()方法来检索和存储包装值。  

  

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class P00120_AccountWritable_0010{
public static void main(String[] args){
AccountWritable aw1=new AccountWritable();
aw1.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true));

AccountWritable aw2=new AccountWritable();
aw2.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true));

AccountWritable.DiyComparator comparator=new AccountWritable.DiyComparator();
System.out.println(comparator.compare(aw1,aw2));
}
}

class AccountWritable
implements WritableComparable<AccountWritable>{

private IntWritable code;
private Text name;
private BooleanWritable gender;

AccountWritable(){
code=new IntWritable();
name=new Text();
gender=new BooleanWritable();
}

// 把参数类型和类类型相同的构造器,叫复制构造器
AccountWritable(AccountWritable aw){
code=new IntWritable(aw.getCode().get());
name=new Text(aw.getName().toString());
gender=new BooleanWritable(aw.getGender().get());
}

public void set(IntWritable code,Text name,BooleanWritable gender){
this.code=new IntWritable(code.get());
this.name=new Text(name.toString());
this.gender=new BooleanWritable(gender.get());
}

@Override
public int compareTo(AccountWritable o){
/*return this.code.compareTo(o.code)!=0?code.compareTo(o.code):
(name.compareTo(o.name)!=0?name.compareTo(o.name):(this.gender.compareTo(o.gender)!=0?gender.compareTo(o.gender):0));*/
int comp=this.code.compareTo(o.code);
if(comp!=0){
return comp;
}else{
comp=this.name.compareTo(o.name);
if(comp!=0){
return comp;
}else{
comp=this.gender.compareTo(o.gender);
if(comp!=0){
return comp;
}else{
return 0;
}
}
}
}

@Override
public void write(DataOutput out) throws IOException{
code.write(out);
name.write(out);
gender.write(out);
}

@Override
public void readFields(DataInput in) throws IOException{
code.readFields(in);
name.readFields(in);
gender.readFields(in);
}
  
  
   //实现一个比较器
static class DiyComparator
implements RawComparator<AccountWritable>{

private IntWritable.Comparator ic=
new Comparator();
private Text.Comparator tc=
new Text.Comparator();
private BooleanWritable.Comparator bc=
new BooleanWritable.Comparator();

@Override
public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){
// code被序列化后在b1和b2数组中的起始位置以及字节长度
int firstLength=4;
int secondLength=4;

int firstStart=s1;
int secondStart=s2;

int firstOffset=0;
int secondOffset=0;

// 比较字节流中的code部分
int comp=ic.compare(
b1,firstStart,firstLength,
b2,secondStart,secondLength);
if(comp!=0){
return comp;
}else{
try{
// 获取记录字符串的起始位置
firstStart=firstStart+firstLength;
secondStart=secondStart+secondLength;
// 获取记录字符串长度的VIntWritable的值的长度,被称为offset
firstOffset=WritableUtils.decodeVIntSize(b1[firstStart]);
secondOffset=WritableUtils.decodeVIntSize(b2[secondStart]);
// 获取字符串的长度
firstLength=readLengthValue(b1,firstStart);
secondLength=readLengthValue(b2,secondStart);
}catch(IOException e){
e.printStackTrace();
}
// 比较字节流中的name部分
comp=tc.compare(b1,firstStart+firstOffset,firstLength,b2,secondStart+secondOffset,secondLength);
if(comp!=0){
return comp;
}else{
firstStart+=(firstOffset+firstLength);
secondStart+=(secondOffset+secondLength);
firstLength=1;
secondLength=1;
// 比较字节流中的gender部分
return bc.compare(b1,firstStart,firstLength,b2,secondStart,secondLength);
}
}
}

private int readLengthValue(
byte[] bytes,int start) throws IOException{
DataInputStream dis=
new DataInputStream(
new ByteArrayInputStream(
bytes,start,WritableUtils.decodeVIntSize(bytes[start])));
VIntWritable viw=new VIntWritable();
viw.readFields(dis);
return viw.get();
}

@Override
public int compare(AccountWritable o1,AccountWritable o2){
ByteArrayOutputStream baos1=new ByteArrayOutputStream();
DataOutputStream dos1=new DataOutputStream(baos1);

ByteArrayOutputStream baos2=new ByteArrayOutputStream();
DataOutputStream dos2=new DataOutputStream(baos2);

try{
o1.write(dos1);
o2.write(dos2);

dos1.close();
dos2.close();

byte[] b1=baos1.toByteArray();
byte[] b2=baos2.toByteArray();

return compare(b1,0,b1.length,b2,0,b2.length);
}catch(IOException e){
e.printStackTrace();
}
return 0;
}
}

public IntWritable getCode(){
return code;
}

public void setCode(IntWritable code){
this.code=code;
}

public Text getName(){
return name;
}

public void setName(Text name){
this.name=name;
}

public BooleanWritable getGender(){
return gender;
}

public void setGender(BooleanWritable gender){
this.gender=gender;
}
}


  注意如果一个类即实现了WritableComparatable接口又写了比较器,优先使用比较器。

原文地址:http://www.cnblogs.com/zhangyinhua/p/7711826.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: