您的位置:首页 > 移动开发

Hadoop(17) MR 决定Mapper数量因素

2016-10-03 19:49 239 查看

源码分析

// 提交任务到集群, 并等待结束
job.waitForCompletion(true);
进入 org.apache.hadoop.mapreduce.Job下的waitForCompletion()
———————————————————————————————> submit();

Job中的waitForCompletion内的submit()
----------------------------------->内有connect() ===> client与server建立RPC通信, client拿到server的代理对象

进入submit中的connect()
-----------------------> cluster ===> 得到cluster, 并作为Job的成员变量(Job持有cluster引用)
其中cluster由来:
new Cluster(getConfiguration()); 通过构造器调用initialize, initialize使用RPC得到server的代理对象
(其中通过ClientProtocol接口来接收, 并且这个接口中有一个versionID)

回到Job中的waitForCompletion内的submit()
org.apache.hadoop.mapreduce.JobStatus
——————————————————————> submitter.submitJobInternal(Job.this, cluster); 将作业提交到系统的内部方法
其中首先检查输出路径, 若输出路径已存在以异常
checkSpecs(job)

然后通过RPC得到存储jar包路径和jobId
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
JobID jobId = submitClient.getNewJobID();

然后将jobStagingArea和jobId拼接起来作为存放jar的唯一路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());

然后再将jar进行拷贝并存放在hdfs中
copyAndConfigureFiles(job, submitJobDir);

然后再为作业分割切片(splits):
若是第一次执行则writeNewSplits(job, jobSubmitDir); ===> 得到TextInputFormat实例
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

计算出切片个数:List<InputSplit> splits = input.getSplits(job);
进入getSplits(JobContext job)
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 1
long maxSize = getMaxSplitSize(job); // 2^63 - 1

long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize)); // 默认情况下切片大小与block一样大134217728
}

例:hdfs下的/aaa目录下有一个129M的a.txt和一个1k的b.txt, 运行MapReduce将aaa作为输入目录,
会读取该目录下不是以_开设的文件。问会启动几个Mapper。
答:启动3个, 看个几个block

然后再真正提交作业
submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); ==>submitClient也是server的代理对象
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop