Job流程:决定map个数的因素
2015-08-21 01:05
381 查看
此文紧接Job流程:提交MR-Job过程。上一篇分析可以看出,MR-Job提交过程的核心代码在于 JobSubmitter 类的 submitJobInternal()方法。本文就由此方法的这一句代码开始分析:
1.查看writeSplits()方法的实现过程:
2.查看writeNewSplits()方法的实现过程:
3.查看getSplits()方法的实现
此方法是InputFormat 类的一个抽象方法。在其子类 FileInputFormat 类中为文件格式输入类型提供了统一的 getSplits()方法实现。
4.查看computeSplitSize()方法的具体实现:
所以,影响 map 任务数量的因素在于以上三个参数的设置:
默认情况 splitSize=blockSize,即一个 map 任务读取一个 block 块。
增加 minSize 超过 128M,则增加 splitSize,即 map 任务个数变小。
减小 maxSize 小于 128M,则减小 splitSize,即 map 任务个数变多。
Map 个数 = 文件大小 / 128M Reduce 个数 = 分区 Partitioner 个数 = 最终输出文件个数
注意:特殊压缩的 map 切分【即不可切分文件】
例题:假设HDFS上有一个大小75MB的文件,当客户端设置Block大小为64MB。则运行MR任务读取该文件时InputSplit大小为多少?
1) 如果该文件是普通文件,则应该是两个InputSplit分片:64MB 和 11MB。
2) 如果该文件是 gzip等压缩包格式的文件,则只有一个InputSplit分片:75MB。
//计算并确定 map 的个数,以及各个输入切片 Splits 的相关信息 int maps = writeSplits(job, submitJobDir);
1.查看writeSplits()方法的实现过程:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { //决定map个数的关键性方法 maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } //返回map个数 return maps; }
2.查看writeNewSplits()方法的实现过程:
//此方法返回int类型,即map的个数 //此方法实现倒着分析为好 private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //List集合是由getSplits()方法生成 -->【核心代码】 List<InputSplit> splits = input.getSplits(job); //array数组是由List<InputSplit>集合转化而来 -->查看List集合的定义 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); //array数组的长度,即map的个数 -->查看array数组的定义 return array.length; }
3.查看getSplits()方法的实现
此方法是InputFormat 类的一个抽象方法。在其子类 FileInputFormat 类中为文件格式输入类型提供了统一的 getSplits()方法实现。
public List<InputSplit> getSplits(JobContext job) throws IOException { //第一个参数返回值为 1; //第二个参数是读取配置文件中的 mapreduce.input.fileinputformat.split.minsize 属性(默认值为 0),如果没有配置则返回 1. //所以 minSize=Math(1,0),即值是 1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //读取配置文件中的 mapreduce.input.fileinputformat.split.maxsize 属性(默认没有配置) //如果没有配置则返回 long.MAX_VALUE long maxSize = getMaxSplitSize(job); //定义 List 集合,用来存储输入分片 InputSplit List<InputSplit> splits = new ArrayList<InputSplit>(); //变量 files 存储的是 "输入路径中所有的文件集合" List<FileStatus> files = listStatus(job); //循环处理每一个输入文件 for (FileStatus file: files) { //获得文件路径 Path path = file.getPath(); //获得文件总长度 long length = file.getLen(); //判断文件是否为空 if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { //获得文件对应的 所有Block块的 位置 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //如果文件大小非空,并且文件允许被分割 if (isSplitable(job, path)) { //获取配置文件中Block块的大小,默认128MB long blockSize = file.getBlockSize(); //计算输入切片的大小【核心代码】 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //将bytesRemaining(剩余未分片字节数)设置为整个文件的长度 long bytesRemaining = length; //while()循环体,按照 splitSize 对每个输入文件进行【逻辑切分】,得到 Splits 集合 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //参数列表:文件所在路径、切片起始的位置、切片大小、切片所在节点 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } //如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { //文件不能切分则将整个文件作为一个输入分片 InputSplit splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { //输入文件为空,则对应的 Block块 所在节点也应该为空 splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
4.查看computeSplitSize()方法的具体实现:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { //对于默认情况,三个参数分别为:1,long.MAX_VALUE,128MB //所以,表达式整体返回 128MB return Math.max(minSize, Math.min(maxSize, blockSize)); }
所以,影响 map 任务数量的因素在于以上三个参数的设置:
默认情况 splitSize=blockSize,即一个 map 任务读取一个 block 块。
增加 minSize 超过 128M,则增加 splitSize,即 map 任务个数变小。
减小 maxSize 小于 128M,则减小 splitSize,即 map 任务个数变多。
Map 个数 = 文件大小 / 128M Reduce 个数 = 分区 Partitioner 个数 = 最终输出文件个数
注意:特殊压缩的 map 切分【即不可切分文件】
例题:假设HDFS上有一个大小75MB的文件,当客户端设置Block大小为64MB。则运行MR任务读取该文件时InputSplit大小为多少?
1) 如果该文件是普通文件,则应该是两个InputSplit分片:64MB 和 11MB。
2) 如果该文件是 gzip等压缩包格式的文件,则只有一个InputSplit分片:75MB。
相关文章推荐
- iOS开发小白学习体验-3
- Android listView中点击item或Item中控件跳转对应的详情页面的实现(商品详情查看)
- 【8-20】java学习笔记02
- ubuntu各版本工具下载链接
- ReSharper 配置及用法
- 软件测试典型问题记录-修改窗口页面存在两个页签,修改后在某个页签点确定,另一页签内容未修改
- 百钱买百鸡之数学优化
- PHP MySQL
- 03_Nginx添加新模块
- Firebug折腾记_(1)安装及简介
- 03_Nginx添加新模块
- Firebug折腾记_(1)安装及简介
- 阻塞套接字返回EAGAIN
- HDU 1010 Tempter of the Bone(DFS)
- 七夕,单身的我是如何在敲完代码之后收到12束玫瑰的
- 让结构体类型frame的某个属性可以直接修改
- 堆排序
- 使用spring-boot 中遇到的一个小坑
- yum方式安装mysql报错找不到mysql.sock
- hdu 5410 CRB and His Birthday(混合背包)