您的位置:首页 > 产品设计 > UI/UE

Hadoop: the definitive guide 第三版 拾遗 第四章 之SequenceFile操作

2013-08-13 15:02 435 查看
      SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。

      在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):

1、NONE: 对records不进行压缩;

2、RECORD: 仅压缩每一个record中的value值;对每一条记录的value值进行了压缩(文件头中包含上使用哪种压缩算法的信息)

3、BLOCK: 将一个block中的所有records压缩在一起;当数据量达到一定大小后,将停止写入进行整体压缩,整体压缩的方法是把所有的keylength,key,vlength,value 分别合在一起进行整体压缩。文件的压缩态标识在文件开头的header数据中。

       基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:

1、SequenceFile.Writer  写入时不压缩任何的key-value对(Record);

2、SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;

3、SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;

一、SequenceFile写操作

package com.tht.hadoopIO;

//cc SequenceFileWriteDemo Writing a SequenceFile
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

//vv SequenceFileWriteDemo
public class SequenceFileWriteDemo {

private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door", "Five, six, pick up sticks",
"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

public static void main(String[] args) throws IOException {
//		String uri = args[0];
String uri = "hdfs://master:9000/seq/numbers.seq";

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);

IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
value.getClass());

for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
// ^^ SequenceFileWriteDemo
上述代码将自定义的DATA写入hdfs集群的../seq/numbers.seq文件中。

二、SequenceFile读操作

package com.tht.hadoopIO;

//cc SequenceFileReadDemo Reading a SequenceFile
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

//vv SequenceFileReadDemo
public class SequenceFileReadDemo {

public static void main(String[] args) throws IOException {
//		String uri = args[0];
String uri = "hdfs://master:9000/seq/numbers.seq";

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);

SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
// ^^ SequenceFileReadDemo


三、搜索SequenceFile中的指定位置

package com.tht.hadoopIO;

//== SequenceFileSeekAndSyncTest
//== SequenceFileSeekAndSyncTest-SeekNonRecordBoundary
//== SequenceFileSeekAndSyncTest-SyncNonRecordBoundary
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.*;

public class SequenceFileSeekAndSyncTest {

private static final String SF_URI = "hdfs://master:9000/seq/numbers.seq";
private FileSystem fs;
private SequenceFile.Reader reader;
private Writable key;
private Writable value;

@Before
public void setUp() throws IOException {
SequenceFileWriteDemo.main(new String[] { SF_URI });

Configuration conf = new Configuration();
fs = FileSystem.get(URI.create(SF_URI), conf);
Path path = new Path(SF_URI);

reader = new SequenceFile.Reader(fs, path, conf);
key = (Writable) ReflectionUtils
.newInstance(reader.getKeyClass(), conf);
value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(),
conf);
}

//	@After
//	public void tearDown() throws IOException {
//		fs.delete(new Path(SF_URI), true);
//	}

@Test
public void seekToRecordBoundary() throws IOException {
// vv SequenceFileSeekAndSyncTest
reader.seek(359);
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(95));
// ^^ SequenceFileSeekAndSyncTest
}

@Test(expected = IOException.class)
public void seekToNonRecordBoundary() throws IOException {
// vv SequenceFileSeekAndSyncTest-SeekNonRecordBoundary
reader.seek(360);
reader.next(key, value); // fails with IOException
// ^^ SequenceFileSeekAndSyncTest-SeekNonRecordBoundary
}

@Test
public void syncFromNonRecordBoundary() throws IOException {
// vv SequenceFileSeekAndSyncTest-SyncNonRecordBoundary
reader.sync(360);
assertThat(reader.getPosition(), is(2021L));
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(59));
// ^^ SequenceFileSeekAndSyncTest-SyncNonRecordBoundary
}

@Test
public void syncAfterLastSyncPoint() throws IOException {
reader.sync(4557);
assertThat(reader.getPosition(), is(4788L));
assertThat(reader.next(key, value), is(false));
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐