spatialhadoop2.3源码阅读(九) ShapeLineInputFormat & ShapeLineRecordReader & SpatialRecordReader[FileMBR]
2015-12-22 17:23
501 查看
1.ShapeLineInputFormat
ShapeLineInputFormat的作用就是生成ShapeLineRecordReader,其具体实现和spatialhadoop2.1源码阅读(三) 自定义InputFormat(SpatialInputFormat & ShapeInputFormat)中介绍的ShapeInputFormat的实现基本相同,具体可看该文章。ShapeLineInputFormat的源码如下:
接下来重点介绍ShapeLineRecordReader和 SpatialRecordReader的实现。
2.ShapeLineRecordReader
ShapeLineRecordReader继承自SpatialRecordReader,ShapeLineRecordReader中最重要的三个方法为createKey,createValue和next。除此之外还有三个构造器。这六个方法的具体实现都是对父类相应方法的调用,所以实现重点在于SpatialRecordReader类。
3.SpatialRecordReader(按照FileMBR MapReduce进行介绍)
3.1 构造函数
2-4:初始化该输入分片所在文件名,起始位置和终止位置
6:打开输入文件,如果输入文件是压缩文件,则该输入流表示压缩输入流
12-14:判断输入文件是否是压缩文件
15-29:若输入文件是压缩文件,则in为解压缩后的输入流,同时根据压缩类型,更新参数。
30-32:若输入文件不是压缩文件,则in和directIn相等
34:设置偏移量,一次读入最多读入的记录数和字节数
38:首先判断输入文件是否有全局索引,如果存在则设置cellMbr,否则cellMbr无效;然后判断输入文件是否为R-tree indexed,根据不同情况初始化不同。
3.2 nextLine函数
2-9:对R-tree indexed进行特殊处理
13-30:在上面所讲的initializeReader函数中,会预先读取八个字节判断文件索引类型,buffer不为空,在这里对这种情况进行处理
33-38:每次读取一行数据,并判断是否到输入流的末尾
39:更新pos
42:对value进行赋值
ShapeLineInputFormat的作用就是生成ShapeLineRecordReader,其具体实现和spatialhadoop2.1源码阅读(三) 自定义InputFormat(SpatialInputFormat & ShapeInputFormat)中介绍的ShapeInputFormat的实现基本相同,具体可看该文章。ShapeLineInputFormat的源码如下:
public class ShapeLineInputFormat extends SpatialInputFormat<Rectangle, Text> { @Override public RecordReader<Rectangle, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { if (reporter != null) reporter.setStatus(split.toString()); this.rrClass = ShapeLineRecordReader.class; return super.getRecordReader(split, job, reporter); } }
接下来重点介绍ShapeLineRecordReader和 SpatialRecordReader的实现。
2.ShapeLineRecordReader
ShapeLineRecordReader继承自SpatialRecordReader,ShapeLineRecordReader中最重要的三个方法为createKey,createValue和next。除此之外还有三个构造器。这六个方法的具体实现都是对父类相应方法的调用,所以实现重点在于SpatialRecordReader类。
public class ShapeLineRecordReader extends SpatialRecordReader<Rectangle, Text> { public ShapeLineRecordReader(Configuration job, FileSplit split) throws IOException { super(job, split); } public ShapeLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { super(split, conf, reporter, index); } public ShapeLineRecordReader(InputStream in, long offset, long endOffset) throws IOException { super(in, offset, endOffset); } @Override public boolean next(Rectangle key, Text shapeLine) throws IOException { boolean read_line = nextLine(shapeLine); key.set(cellMbr); return read_line; } @Override public Rectangle createKey() { return new Rectangle(); } @Override public Text createValue() { return new Text(); } }
3.SpatialRecordReader(按照FileMBR MapReduce进行介绍)
3.1 构造函数
public SpatialRecordReader(Configuration job, long s, long l, Path p) throws IOException { this.start = s; this.end = s + l; this.path = p; this.fs = this.path.getFileSystem(job); this.directIn = fs.open(this.path); this.blockSize = fs.getFileStatus(this.path).getBlockSize(); this.cellMbr = new Rectangle(); LOG.info("Open a SpatialRecordReader to file: "+this.path); codec = new CompressionCodecFactory(job).getCodec(this.path); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( directIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = cIn; start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = codec.createInputStream(directIn, decompressor); filePosition = directIn; } } else { directIn.seek(start); in = directIn; filePosition = directIn; } this.pos = start; this.maxShapesInOneRead = job.getInt(SpatialSite.MaxShapesInOneRead, 1000000); this.maxBytesInOneRead = job.getInt(SpatialSite.MaxBytesInOneRead, 32*1024*1024); initializeReader(); }
2-4:初始化该输入分片所在文件名,起始位置和终止位置
6:打开输入文件,如果输入文件是压缩文件,则该输入流表示压缩输入流
12-14:判断输入文件是否是压缩文件
15-29:若输入文件是压缩文件,则in为解压缩后的输入流,同时根据压缩类型,更新参数。
30-32:若输入文件不是压缩文件,则in和directIn相等
34:设置偏移量,一次读入最多读入的记录数和字节数
38:首先判断输入文件是否有全局索引,如果存在则设置cellMbr,否则cellMbr无效;然后判断输入文件是否为R-tree indexed,根据不同情况初始化不同。
3.2 nextLine函数
protected boolean nextLine(Text value) throws IOException { if (blockType == BlockType.RTREE && pos == 8) { // File is positioned at the RTree header // Skip the header and go to first data object in file pos += RTree.skipHeader(in); LOG.info("Skipped R-tree to position: "+pos); // Reinitialize record reader at the new position lineReader = new LineReader(in); } while (getFilePosition() <= end) { value.clear(); int b = 0; if (buffer != null) { // Read the first line encountered in buffer int eol = RTree.skipToEOL(buffer, 0); b += eol; value.append(buffer, 0, eol); if (eol < buffer.length) { // There are still some bytes remaining in buffer byte[] tmp = new byte[buffer.length - eol]; System.arraycopy(buffer, eol, tmp, 0, tmp.length); buffer = tmp; } else { buffer = null; } // Check if a complete line has been read from the buffer byte last_byte = value.getBytes()[value.getLength()-1]; if (last_byte == '\n' || last_byte == '\r') return true; } // Read the first line from stream Text temp = new Text(); b += lineReader.readLine(temp); if (b == 0) { // Indicates an end of stream return false; } pos += b; // Append the part read from stream to the part extracted from buffer value.append(temp.getBytes(), 0, temp.getLength()); if (value.getLength() > 1) { // Read a non-empty line. Note that end-of-line character is included return true; } } // Reached end of file return false; }
2-9:对R-tree indexed进行特殊处理
13-30:在上面所讲的initializeReader函数中,会预先读取八个字节判断文件索引类型,buffer不为空,在这里对这种情况进行处理
33-38:每次读取一行数据,并判断是否到输入流的末尾
39:更新pos
42:对value进行赋值
相关文章推荐
- linux scp和rsyc限速传输
- 配置Sprintmvc4&tomcat解释中文乱码问题
- 快速掌握htop
- Window.open()方法参数详解
- Spark-shell例子
- linux vmstat命令总结
- 第一节jdk1.6+tomcat6搭建方法
- 老李推荐:第1章3节《MonkeyRunner源码剖析》概述:架构
- Linux基础优化
- Tomcat Server.xml配置文件
- linux 下查看机器是cpu是几核的
- linux vi编辑
- LR对Tuxedo/Weblogic/websphere/Apache/MQ的性能监控
- U盘安装中标麒麟linux V6.0教程
- tomcat中当点击Server Status的时候不能用
- LoadRunner监控Linux资源监控
- linux cache总结
- opencv静态配置
- Linux常用命令英文缩写
- U盘安装Linux红帽子6.4系统