Hadoop: the definitive guide 第三版 拾遗 第四章 之SequenceFile操作
2013-08-13 15:02
435 查看
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。
在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):
1、NONE: 对records不进行压缩;
2、RECORD: 仅压缩每一个record中的value值;对每一条记录的value值进行了压缩(文件头中包含上使用哪种压缩算法的信息)
3、BLOCK: 将一个block中的所有records压缩在一起;当数据量达到一定大小后,将停止写入进行整体压缩,整体压缩的方法是把所有的keylength,key,vlength,value 分别合在一起进行整体压缩。文件的压缩态标识在文件开头的header数据中。
基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:
1、SequenceFile.Writer 写入时不压缩任何的key-value对(Record);
2、SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;
3、SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;
一、SequenceFile写操作
二、SequenceFile读操作
三、搜索SequenceFile中的指定位置
在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):
1、NONE: 对records不进行压缩;
2、RECORD: 仅压缩每一个record中的value值;对每一条记录的value值进行了压缩(文件头中包含上使用哪种压缩算法的信息)
3、BLOCK: 将一个block中的所有records压缩在一起;当数据量达到一定大小后,将停止写入进行整体压缩,整体压缩的方法是把所有的keylength,key,vlength,value 分别合在一起进行整体压缩。文件的压缩态标识在文件开头的header数据中。
基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:
1、SequenceFile.Writer 写入时不压缩任何的key-value对(Record);
2、SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;
3、SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;
一、SequenceFile写操作
package com.tht.hadoopIO; //cc SequenceFileWriteDemo Writing a SequenceFile import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; //vv SequenceFileWriteDemo public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { // String uri = args[0]; String uri = "hdfs://master:9000/seq/numbers.seq"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } // ^^ SequenceFileWriteDemo上述代码将自定义的DATA写入hdfs集群的../seq/numbers.seq文件中。
二、SequenceFile读操作
package com.tht.hadoopIO; //cc SequenceFileReadDemo Reading a SequenceFile import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; //vv SequenceFileReadDemo public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { // String uri = args[0]; String uri = "hdfs://master:9000/seq/numbers.seq"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance( reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); // beginning of next record } } finally { IOUtils.closeStream(reader); } } } // ^^ SequenceFileReadDemo
三、搜索SequenceFile中的指定位置
package com.tht.hadoopIO; //== SequenceFileSeekAndSyncTest //== SequenceFileSeekAndSyncTest-SeekNonRecordBoundary //== SequenceFileSeekAndSyncTest-SyncNonRecordBoundary import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; import org.junit.*; public class SequenceFileSeekAndSyncTest { private static final String SF_URI = "hdfs://master:9000/seq/numbers.seq"; private FileSystem fs; private SequenceFile.Reader reader; private Writable key; private Writable value; @Before public void setUp() throws IOException { SequenceFileWriteDemo.main(new String[] { SF_URI }); Configuration conf = new Configuration(); fs = FileSystem.get(URI.create(SF_URI), conf); Path path = new Path(SF_URI); reader = new SequenceFile.Reader(fs, path, conf); key = (Writable) ReflectionUtils .newInstance(reader.getKeyClass(), conf); value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); } // @After // public void tearDown() throws IOException { // fs.delete(new Path(SF_URI), true); // } @Test public void seekToRecordBoundary() throws IOException { // vv SequenceFileSeekAndSyncTest reader.seek(359); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(95)); // ^^ SequenceFileSeekAndSyncTest } @Test(expected = IOException.class) public void seekToNonRecordBoundary() throws IOException { // vv SequenceFileSeekAndSyncTest-SeekNonRecordBoundary reader.seek(360); reader.next(key, value); // fails with IOException // ^^ SequenceFileSeekAndSyncTest-SeekNonRecordBoundary } @Test public void syncFromNonRecordBoundary() throws IOException { // vv SequenceFileSeekAndSyncTest-SyncNonRecordBoundary reader.sync(360); assertThat(reader.getPosition(), is(2021L)); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(59)); // ^^ SequenceFileSeekAndSyncTest-SyncNonRecordBoundary } @Test public void syncAfterLastSyncPoint() throws IOException { reader.sync(4557); assertThat(reader.getPosition(), is(4788L)); assertThat(reader.next(key, value), is(false)); } }
相关文章推荐
- Hadoop: the definitive guide 第三版 拾遗 第四章 之SequenceFile操作
- Hadoop: the definitive guide 第三版 拾遗 第四章 之MapFile
- Hadoop: the definitive guide 第三版 拾遗 第四章 之CompressionCodec
- Hadoop: the definitive guide 第三版 拾遗 第四章 之hadoop本地库
- Hadoop: the definitive guide 第三版 拾遗 第四章 之CompressionCodec
- Hadoop: the definitive guide 第三版 拾遗 第四章
- Hadoop: the definitive guide 第三版 拾遗 第四章 之MapFile
- Hadoop: the definitive guide 第三版 拾遗 第四章 之hadoop本地库
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive分区表、桶
- Hadoop: the definitive guide 第三版 拾遗 第十章 之Pig
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive初步
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive初步
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive分区表、桶
- Hadoop: the definitive guide 第三版 拾遗 第十三章 之HBase起步
- Hadoop: the definitive guide 第三版 拾遗 第三章 之查看文件及正则表达式
- Hadoop: the definitive guide 第三版 拾遗 第十一章 之Pig
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive架构
- Hadoop: the definitive guide 第三版 拾遗 第三章 之查看文件及正则表达式
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive分区表、桶
- Hadoop: the definitive guide 第三版 拾遗 第五章 之MRUnit