您的位置:首页 > 运维架构

spatialhadoop2.3源码阅读(九) ShapeLineInputFormat & ShapeLineRecordReader & SpatialRecordReader[FileMBR]

2015-12-22 17:23 501 查看
1.ShapeLineInputFormat

ShapeLineInputFormat的作用就是生成ShapeLineRecordReader,其具体实现和spatialhadoop2.1源码阅读(三) 自定义InputFormat(SpatialInputFormat & ShapeInputFormat)中介绍的ShapeInputFormat的实现基本相同,具体可看该文章。ShapeLineInputFormat的源码如下:

public class ShapeLineInputFormat extends SpatialInputFormat<Rectangle, Text> {

@Override
public RecordReader<Rectangle, Text> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
if (reporter != null)
reporter.setStatus(split.toString());
this.rrClass = ShapeLineRecordReader.class;
return super.getRecordReader(split, job, reporter);
}
}


接下来重点介绍ShapeLineRecordReader和 SpatialRecordReader的实现。

2.ShapeLineRecordReader

ShapeLineRecordReader继承自SpatialRecordReader,ShapeLineRecordReader中最重要的三个方法为createKey,createValue和next。除此之外还有三个构造器。这六个方法的具体实现都是对父类相应方法的调用,所以实现重点在于SpatialRecordReader类。

public class ShapeLineRecordReader
extends SpatialRecordReader<Rectangle, Text> {

public ShapeLineRecordReader(Configuration job, FileSplit split)
throws IOException {
super(job, split);
}

public ShapeLineRecordReader(CombineFileSplit split, Configuration conf,
Reporter reporter, Integer index) throws IOException {
super(split, conf, reporter, index);
}

public ShapeLineRecordReader(InputStream in, long offset, long endOffset)
throws IOException {
super(in, offset, endOffset);
}

@Override
public boolean next(Rectangle key, Text shapeLine) throws IOException {
boolean read_line = nextLine(shapeLine);
key.set(cellMbr);
return read_line;
}

@Override
public Rectangle createKey() {
return new Rectangle();
}

@Override
public Text createValue() {
return new Text();
}
}


3.SpatialRecordReader(按照FileMBR MapReduce进行介绍)

3.1 构造函数

public SpatialRecordReader(Configuration job, long s, long l, Path p) throws IOException {
this.start = s;
this.end = s + l;
this.path = p;
this.fs = this.path.getFileSystem(job);
this.directIn = fs.open(this.path);
this.blockSize = fs.getFileStatus(this.path).getBlockSize();
this.cellMbr = new Rectangle();

LOG.info("Open a SpatialRecordReader to file: "+this.path);

codec = new CompressionCodecFactory(job).getCodec(this.path);

if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
directIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = cIn;
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = codec.createInputStream(directIn, decompressor);
filePosition = directIn;
}
} else {
directIn.seek(start);
in = directIn;
filePosition = directIn;
}
this.pos = start;
this.maxShapesInOneRead = job.getInt(SpatialSite.MaxShapesInOneRead, 1000000);
this.maxBytesInOneRead = job.getInt(SpatialSite.MaxBytesInOneRead, 32*1024*1024);

initializeReader();
}


2-4:初始化该输入分片所在文件名,起始位置和终止位置

6:打开输入文件,如果输入文件是压缩文件,则该输入流表示压缩输入流

12-14:判断输入文件是否是压缩文件

15-29:若输入文件是压缩文件,则in为解压缩后的输入流,同时根据压缩类型,更新参数。

30-32:若输入文件不是压缩文件,则in和directIn相等

34:设置偏移量,一次读入最多读入的记录数和字节数

38:首先判断输入文件是否有全局索引,如果存在则设置cellMbr,否则cellMbr无效;然后判断输入文件是否为R-tree indexed,根据不同情况初始化不同。

3.2 nextLine函数

protected boolean nextLine(Text value) throws IOException {
if (blockType == BlockType.RTREE && pos == 8) {
// File is positioned at the RTree header
// Skip the header and go to first data object in file
pos += RTree.skipHeader(in);
LOG.info("Skipped R-tree to position: "+pos);
// Reinitialize record reader at the new position
lineReader = new LineReader(in);
}
while (getFilePosition() <= end) {
value.clear();
int b = 0;
if (buffer != null) {
// Read the first line encountered in buffer
int eol = RTree.skipToEOL(buffer, 0);
b += eol;
value.append(buffer, 0, eol);
if (eol < buffer.length) {
// There are still some bytes remaining in buffer
byte[] tmp = new byte[buffer.length - eol];
System.arraycopy(buffer, eol, tmp, 0, tmp.length);
buffer = tmp;
} else {
buffer = null;
}
// Check if a complete line has been read from the buffer
byte last_byte = value.getBytes()[value.getLength()-1];
if (last_byte == '\n' || last_byte == '\r')
return true;
}

// Read the first line from stream
Text temp = new Text();
b += lineReader.readLine(temp);
if (b == 0) {
// Indicates an end of stream
return false;
}
pos += b;

// Append the part read from stream to the part extracted from buffer
value.append(temp.getBytes(), 0, temp.getLength());

if (value.getLength() > 1) {
// Read a non-empty line. Note that end-of-line character is included
return true;
}
}
// Reached end of file
return false;
}


2-9:对R-tree indexed进行特殊处理

13-30:在上面所讲的initializeReader函数中,会预先读取八个字节判断文件索引类型,buffer不为空,在这里对这种情况进行处理

33-38:每次读取一行数据,并判断是否到输入流的末尾

39:更新pos

42:对value进行赋值
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: