您的位置:首页 > 运维架构

从零开始最短路径学习Hadoop之04----Hadoop的I/O

2013-07-30 12:22 267 查看
《Hadoop权威指南》第2版本对Hadoop的I/O讲述内容甚多。对初学者来说,暂时不用使用太多技术。熟悉了主线,更多的细节查询手册即可得知。

1. 序列化

    1.1 为什么序列化?

          将一个内存中的对象,转化成字节流,这样可以很方便地通过网络进行传输,或者在磁盘上永久存储。

          从字节流转化成内存中的对象,称反序列化。

    1.2 序列化在分布式系统的两个地方经常出现:进程间通信,永久存储。

    1.3 Hadoop多节点的进程间通信是通过远程过程调用rpc实现的。rpc协议将消息序列化成为二进制流,然后发送到远程节点。远程节点将二进制流发序列化为原始消息。

    1.4 Hadoop的序列化格式Writable接口,它是Hadoop的核心。

    1.5 Writable接口定义两个方法,一个将状态写到DataOutput二进制流,一个从DataInput二进制流读取状态。

    1.6 IntWritable是封装java int的类,使用方式如下:

          IntWritable intw = new IntWritable();

          intw.set(163);

         也是这样:IntWritable  intw = new IntWritable(163);

    1.7 一个演示序列化的和反序列化的例子DispIntWri.java

package com.cere;

import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.StringUtils;

public class DispIntWrit{
public static byte[] serialize(Writable w)throws IOException{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataout = new DataOutputStream(out);
w.write(dataout);
dataout.close();
return out.toByteArray();
}

public static byte[] deserialize(Writable w, byte[] bytes) throws IOException{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream datain = new DataInputStream(in);
w.readFields(datain);
datain.close();
return bytes;
}

public static void main(String[] args) throws Exception{
IntWritable intw = new IntWritable(163);
byte[] bytes = serialize(intw);
String bytes_str = StringUtils.byteToHexString(bytes);
System.out.println(bytes_str);
System.out.println(bytes.length);
System.out.println("---------------");
IntWritable intw2 = new IntWritable(-1);
deserialize(intw2, bytes);
System.out.println(intw2);
}
}


        1.7.1 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ src/*.java
        1.7.2 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ jar -cvf dit.jar -C ./classes/ .
        1.7.3 执行:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar dit.jar com.cere.DispIntWrit 

    1.8 WritableComparable接口继承了Writable和java.lang.Comparable接口。
           IntWritable实现了WritableComparable接口。
           对MapReduce来说,类型的比较是非常重要的,因为中间有个基于key的排序阶段。
           RawComparator接口继承了java jdk中的 Comparator接口,可以直接比较数据流中的记录,不需要把数据流反序列化为对象,这样可以避免了新建对象的额外开销。
           WritableComparator是一个类,实现了RawComparator接口。
    1.9 Hadoop里实现很多Writable类:BooleanWritable, ByteWritable, IntWritable, VintWritable, FloatWritable, LongWritable, VlongWritable, DoubleWritable等等。Text是utf-8序列的Writable类。
    1.10 实现定制的Writable类型。
    1.11 为速度实现一个RawComparator类型。
    1.12 定制Comparator类型。
    1.13 序列化框架:只要有一种机制对每个类进行类型与二进制表示的来回转换,就可以使用任何类型。
    1.14 序列化IDL:接口定义语言Interface Description Language,不依赖具体语言的方式进行声明。
    1.15  Apache Avro是一个独立于编程语言的数据序列化系统,解决Hadoop中Writable类型缺乏语言可移植性的问题。

2. 基于文件的数据结构SequenceFile

    日志文件,每一条日志记录是一行文本。如果想记录二进制类型,需要将文本转化成二进制。可以把日志文件转化成二进制的SeqenceFile存储,键key是LongWritable类型表示时间戳,值value是Writable类型,表示日志记录的数量。

    2.1 读写SeqenceFile的例子,WRSeqFile.java

package com.cere;

import java.lang.Exception;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ReflectionUtils;

public class WRSeqFile{
private static final String[] DATA = {
"one",
"Three",
"Five",
"Seven",
"Nine"
};

public static void writeSeqFile(String[] args) throws IOException{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();

SequenceFile.Writer writer = null;
try{
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for(int i = 0; i < 100; i++){
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
}finally{
System.out.println("---------------------------------");
System.out.println("write ok.");
IOUtils.closeStream(writer);
}
}

public static void readSeqFile(String[] args)throws IOException{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);

SequenceFile.Reader reader = null;
try{
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while(reader.next(key, value)){
String syncSeen = reader.syncSeen()?"*":"";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition();
}
}finally{
System.out.println("---------------------------------");
System.out.println("read ok.");
IOUtils.closeStream(reader);
}
}

public static void main(String[] args) throws Exception{
writeSeqFile(args);
readSeqFile(args);
}
}


    2.2 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java 
    2.3 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ jar -cvf wrsf.jar -C ./classes/ .

    2.4 执行:
            brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar
wrsf.jar com.cere.WRSeqFile a.txt

            brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -text a.txt
    2.5 同步点:当数据读取的实例出错后能够再一次与记录边界同步的数据流中的一个位置。它是由SequenceFile.Writer记录的,也就是在顺序文件写入过程中插入一个特殊项以便每隔开几个记录便有一个同步表示,同步点位于记录的边界之处。
    2.6 在顺序文件中搜索给定位置有两种方法,一种是调用seek方法,读取给定位置,如果给定位置不是记录边界,调用next方法时会发生错误;另一种是通过同步点找到记录边界。
    2.7 通过命令行接口显示SequenceFile对象: "hadoop fs -text a.txt"
    2.8 顺序文件格式:前三个字节是SEQ,第4个字节是版本号,还有其他一些信息。

3. 基于文件的数据结构MapFile

    MapFile是已经排序的SequenceFile,它已加入用于搜索键的索引。可以将MapFile视为java.util.Map的持久化形式。它的大小可能超过保存在内存中的一个map的大小。

    注意对比SequenceFile和MapFile的Writer和Reader。

    3.1 读写MapFile的的例子WRMapFile.java

package com.cere;

import java.lang.Exception;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ReflectionUtils;

public class WRMapFile{
private static final String[] DATA = {
"one",
"Three",
"Five",
"Seven",
"Nine"
};

public static void writeMapFile(String[] args) throws IOException{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();

MapFile.Writer writer = null;
try{
writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
for(int i = 0; i < 1024; i++){
key.set(i+1);
value.set(DATA[i % DATA.length]);
writer.append(key, value);
}
}finally{
System.out.println("---------------------------------");
System.out.println("write ok.");
IOUtils.closeStream(writer);
}
}

public static void readMapFile(String[] args)throws IOException{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
MapFile.Reader reader = null;
try{
reader = new MapFile.Reader(fs, path.toString(), conf);
IntWritable key = new IntWritable();
Text value = new Text();
while(reader.next(key, value)){
System.out.printf("%s\t%s\n", key, value);
}
}finally{
System.out.println("---------------------------------");
System.out.println("read ok.");
IOUtils.closeStream(reader);
}
}

public static void main(String[] args) throws Exception{
writeMapFile(args);
readMapFile(args);
}
}


    3.2 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java 
    3.3 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ jar -cvf wrmf.jar -C ./classes/ .
    3.4 执行:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar wrmf.jar com.cere.WRMapFile b
    3.5 写入时,文件名b是一个目录,里面包含data和index两个文件,都是SequenceFile,data文件包含所有记录,index文件包含一部分键和data中键到该键的偏移量的映射。一般是每隔128个键才有一个包含在index文件中。

    3.6 读取时,可以调用get()方法可以随机访问文件中的数据。一次查找需要一次磁盘寻址和一次最多128个条目的扫描。getClost()跟get()方法类似。

    3.7 可以对MapFile进行重建索引。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: