您的位置:首页 > 运维架构

spatialhadoop2.3源码阅读(五) grid 索引生成方法(一)

2015-12-15 09:14 435 查看
SpatialHadoop的索引生成类为edu.umn.cs.spatialHadoop.operations.Repartition。其main 方法如下

public static void main(String[] args) throws Exception {
OperationsParams params = new OperationsParams(new GenericOptionsParser(args));

if (!params.checkInputOutput(true)) {
printUsage();
return;
}
if (params.get("sindex") == null) {
System.err.println("Please specify type of index to build (grid, rtree, r+tree, str, str+)");
printUsage();
return;
}
Path inputPath = params.getInputPath();
Path outputPath = params.getOutputPath();

// The spatial index to use
long t1 = System.currentTimeMillis();
repartition(inputPath, outputPath, params);
long t2 = System.currentTimeMillis();
System.out.println("Total indexing time in millis "+(t2-t1));
}
在main方法中,会分别校验输入输出参数,sindex参数。其中sindex参数即代表索引类型,本篇讨论的是grid索引的生成方式。

main函数校验过后,会进入repartition方法,内容如下:

public static void repartition(Path inFile, Path outputPath,
OperationsParams params) throws IOException, InterruptedException {
JobConf job = new JobConf(params, FileMBR.class);
FileInputFormat.addInputPath(job, inFile);
ShapeInputFormat<Shape> inputFormat = new ShapeInputFormat<Shape>();

boolean autoLocal = inputFormat.getSplits(job, 1).length <= 3;
boolean isLocal = params.is("local", autoLocal);

if (isLocal)
repartitionLocal(inFile, outputPath, params);
else
repartitionMapReduce(inFile, outputPath, null, params);
}

repartition方法中,会根据输入文件的分片数目和输入参数local,来确定是在本地还是采用mapreduce的方式生成索引。接下来重点介绍mapreduce方式。

repartitionMapReduce方法主要分为三部分,第一部分如下:

String sindex = params.get("sindex");
boolean overwrite = params.is("overwrite");
Shape stockShape = params.getShape("shape");

FileSystem outFs = outPath.getFileSystem(params);

// Calculate number of partitions in output file
// Copy blocksize from source file if it's globally indexed
@SuppressWarnings("deprecation")
final long blockSize = outFs.getDefaultBlockSize();


第一部分的主要功能是获得各种输入参数。分别为是否覆盖输出文件,索引类型,shape类型,以及获得默认hdfs块大小。

第二部分如下:

// Calculate the dimensions of each partition based on gindex type
if(cellInfos == null){
if (sindex.equals("grid")) {
Rectangle input_mbr = FileMBR.fileMBR(inFile, params);
long inFileSize = FileMBR.sizeOfLastProcessedFile;
int num_partitions = calculateNumberOfPartitions(new Configuration(),
inFileSize, outFs, outPath, blockSize);

GridInfo gridInfo = new GridInfo(input_mbr.x1, input_mbr.y1,
input_mbr.x2, input_mbr.y2);
gridInfo.calculateCellDimensions(num_partitions);
cellInfos = gridInfo.getAllCells();
} else if (sindex.equals("rtree") || sindex.equals("r+tree") ||
sindex.equals("str") || sindex.equals("str+")) {
// Pack in rectangles using an RTree
cellInfos = packInRectangles(inFile, outPath, params);
} else {
throw new RuntimeException("Unsupported spatial index: "+sindex);
}
}


第二部分的功能是计算出CellInfo[]数组。cellInfo中存储了每一个矩形网格的(x1,y1)-(x2,y2)坐标。具体介绍如下:

第4行:首先运行一个MapReudce,计算出输入文件中数据的最小包围矩形

5:获得输入文件大小

6:在输入文件的基础上,增加一个百分比,得出索引文件大小,然后除以默认块大小,向上取整。即相当于理想情况下每一个网格存储的数据正好为块大小。得出网格数目具体代码如下:

public static int calculateNumberOfPartitions(Configuration conf, long inFileSize,
FileSystem outFs, Path outFile, long blockSize) throws IOException {
final float IndexingOverhead =
conf.getFloat(SpatialSite.INDEXING_OVERHEAD, 0.1f);
long indexedFileSize = (long) (inFileSize * (1 + IndexingOverhead));
if (blockSize == 0)
blockSize = outFs.getDefaultBlockSize();
return (int)Math.ceil((float)indexedFileSize / blockSize);
}


7-8:根据算出的网格数,将输入数据打格,算出最近似的行数和列数,再将最小包围矩形和行列数一起存入GridInfo中。则GridInfo中存储的将是打好格的,有边界的网格矩形。

9:根据GridInfo计算出每一个网格的左下角和右上角坐标。

第三部分主要是对MapReduce Job的一些设置,具体如下:

JobConf job = new JobConf(params, Repartition.class);

job.setJobName("Repartition");

// Overwrite output file
if (outFs.exists(outPath)) {
if (overwrite)
outFs.delete(outPath, true);
else
throw new RuntimeException("Output file '" + outPath
+ "' already exists and overwrite flag is not set");
}

// Decide which map function to use depending on the type of global index
if (sindex.equals("rtree") || sindex.equals("str")) {
// Repartition without replication
job.setMapperClass(RepartitionMapNoReplication.class);
} else {
// Repartition with replication (grid, str+, and r+tree)
job.setMapperClass(RepartitionMap.class);
}
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(stockShape.getClass());
ShapeInputFormat.setInputPaths(job, inFile);
job.setInputFormat(ShapeInputFormat.class);

ClusterStatus clusterStatus = new JobClient(job).getClusterStatus();
job.setNumMapTasks(10 * Math.max(1, clusterStatus.getMaxMapTasks()));

FileOutputFormat.setOutputPath(job,outPath);
if (sindex.equals("grid") || sindex.equals("str") || sindex.equals("str+")) {
job.setOutputFormat(GridOutputFormat.class);
} else if (sindex.equals("rtree") || sindex.equals("r+tree")) {
// For now, the two types of local index are the same
job.setOutputFormat(RTreeGridOutputFormat.class);
} else {
throw new RuntimeException("Unsupported spatial index: "+sindex);
}

SpatialSite.setCells(job, cellInfos);
job.setBoolean(SpatialSite.OVERWRITE, overwrite);

// Set reduce function
job.setReducerClass(RepartitionReduce.class);
job.setNumReduceTasks(Math.max(1, Math.min(cellInfos.length,
(clusterStatus.getMaxReduceTasks() * 9 + 5) / 10)));

// Set output committer that combines output files together
job.setOutputCommitter(RepartitionOutputCommitter.class);

JobClient.runJob(job);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: