MapReduce——客户端提交任务源码分析
2021-06-09 16:24
267 查看
计算向数据移动
MR程序并不会在客户端执行任何的计算操作,它是为计算工作做好准备,例如计算出切片信息,直接影响到Map任务的并行度。
在Driver中提交任务时,会写到这样的语句:
boolean result = job.waitForCompletion(true);
进入到
waitForCompletion中:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { // 提交任务语句 submit(); } ..............
继续跟进
submit():
public voidsubmit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { // 执行提交任务 return submitter.submitJobInternal(Job.this, cluster); } }); .............. }
上面代码可以看出,客户端经过连接集群,获得任务提交器
submitter后执行了
submitJobInternal(Job.this, cluster)方法,进入看(其实我只想看切片方法)
/** * Internal method for submitting jobs to the system. * The job submission process involves: * 1、Checking the input and output specifications of the job. * 2、Computing the InputSplits for the job. * 3、Setup the requisite accounting information for the * DistributedCacheof the job, if necessary. * 4、Copying the job's jar and configuration to the map-reduce system * directory on the distributed file-system. * 5、Submitting the job to the JobTracker and optionally * monitoring it's status. */ .............. // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); ..............
从这个方法头上的注释信息可以看到,在真正执行任务之前,客户端做了这么5件事,稍微翻译一下:
- 检查作业的输入和输出规范;
- 计算输入切片的数量;
- 如有必要,为作业的
DistributedCache
设置必要的记帐信息; - 将作业的 jar 和配置复制到分布式文件系统上的 map-reduce system 目录;
- 将作业提交给
JobTracker
并可选择监控它的状态
可以看到执行切片的方法时
writeSplits(job, submitJobDir)
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
也有新旧API的区分,看新的
writeNewSplits(job, jobSubmitDir)
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { .................. // 只看切片方法 List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits. ad8 toArray(new InputSplit[splits.size()]); .............. // 返回值是数组的长度,也就是切片的个数,也就是mapTask的并行度 return array.length; }
进入切片方法,方法太长了,删除部分,留下核心业务逻辑。这个得好好说说
public List<InputSplit> getSplits(JobContext job) throws IOException { // 如果没有指定的话,minSize = 1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 如果没有指定的话,maxSize = Long.Max long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); // FileStatus这个概念来自于HDFS,存储客户端提交文件的元数据 List<FileStatus> files = listStatus(job); for (FileStatus file: files) { // 获取到文件的路径 Path path = file.getPath(); // 获取到文件的长度 long length = file.getLen(); if (length != 0) { // 数据块位置数组,用于存储该文件对应的数据块的位置 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { // 没有指定,默认是可分片的 long blockSize = file.getBlockSize(); // 返回默认值:切片大小 = 块大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 获取整个文件的长度,用于计算切片的偏移量 long bytesRemaining = length; // SPLIT_SLOP 的大小是1.1 // 这个判断表达式的含义是如果剩余的块体积大大于1.1倍的切片大小,继续切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { // 在这计算了一步块索引 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //-----------getBlockIndex() begin-------------------------------------------- protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ // 代码逻辑非常简单,就是返回当前offset是在哪个block里面 return i; } } ....... ad0 ............. //-----------getBlockIndex() end---------------------------------------------- // 计算完成之后加入切片集合 // 切片信息包括:路径,偏移量,切片大小,服务器节点【支撑计算向数据移动】 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } // 计算剩余数据块的切片信息 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable :不能切片,那就是一片 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } ...... // 返回切片文件的集合。根据集合中数据的个数,就可以计算出有多少个maptask return splits; }
相关文章推荐
- Mapreduce 任务提交源码分析1
- MapReduce执行过程源码分析(一)——Job任务的提交
- MapReduce任务提交源码分析
- spark core源码分析1 集群启动及任务提交过程
- MapReduce的ReduceTask任务的运行源码级分析
- MapReduce作业提交源码分析第一讲【小二讲堂】
- MapReduce的MapTask任务的运行源码级分析
- MapReduce新版客户端API源码分析
- Hadoop源码分析23:MapReduce的Job提交过程
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- MapReduce作业提交源码分析
- 客户端用java api 远程操作HDFS以及远程提交MR任务(源码和异常处理
- Hive生成MapReduce任务源码分析
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- 八、MapReduce--job提交源码分析
- MapReduce源码分析(一):总过程及Job提交过程
- jstorm源码分析:提交任务过程
- MapReduce Job本地提交过程源码跟踪及分析