您的位置:首页 > 其它

WordCount作业提交到FileInputFormat类中split切分算法和host选择算法过程源码分析

2018-01-15 15:29 585 查看
参考 FileInputFormat类中split切分算法和host选择算法介绍 以及 Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量) 以及 Hadoop中FileInputFormat计算InputSplit的getSplits方法的流程 以及 hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

分析前先介绍一下:

( 这里要注意下, Block 的 hosts 和 Split 的 hosts 不一样, Split 的 hosts 是通过 Split 的 hosts 按一定方法生成的, 如果一个 Block 对应一个 Split (一般情况下是这样的), 这时它们两个 hosts 是一样的. 如果不是一对一( Split > block), 则 Split 需要按一定方法选择 hosts .

Split 和 MapTask 是一一对应的, 一个 Split 对应一个 MapTask. 所以本地性是跟 Split 的 hosts 相关的.

BlocksMap存储 Block 与 BlockInfo 的映射关系, Block 中主要包含3项: long blockId; // 数据块的唯一标识,即数据块的ID号. long numBytes; // 数据块包含的文件数据大小. long generationStamp; // 数据块的版本号,或数据块的时间戳. BlockInfo( 在 Hadoop-2.7.3 中是 BlockInfoContiguous) 包含所以副本所在主机名. )

开始分析: ( 这里是 hadoop-2.7.3-src )

以WordCount开始: org.apache.hadoop.examples.WordCount.main() 内部调用 org.apache.hadoop.mapreduce.Job.waitForCompletion(boolean)

// 该段代码在 org.apache.hadoop.mapreduce.JobSubmitter 中

// ......

/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
*   <li>
*   Checking the input and output specifications of the job.
*   </li>
*   <li>
*   Computing the {@link InputSplit}s for the job.
*   </li>
*   <li>
*   Setup the requisite accounting information for the
*   {@link DistributedCache} of the job, if necessary.
*   </li>
*   <li>
*   Copying the job's jar and configuration to the map-reduce system
*   directory on the distributed file-system.
*   </li>
*   <li>
*   Submitting the job to the <code>JobTracker</code> and optionally
*   monitoring it's status.
*   </li>
* </ol></p>
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs
checkSpecs(job);

Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}

copyAndConfigureFiles(job, submitJobDir);

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);  // 为job创建分片
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}

// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}

// Write job file to submit dir
writeConf(conf, submitJobFile);

//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());  // 提交 job
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}

// ......

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}

// ......

@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

// ......

//method to write splits for old api mapper.
private int writeOldSplits(JobConf job, Path jobSubmitDir)
throws IOException {
org.apache.hadoop.mapred.InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
public int compare(org.apache.hadoop.mapred.InputSplit a,
org.apache.hadoop.mapred.InputSplit b) {
try {
long left = a.getLength();
long right = b.getLength();
if (left == right) {
return 0;
} else if (left < right) {
return 1;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
}
});
JobSplitWriter.createSplitFiles(jobSubmitDir, job,
jobSubmitDir.getFileSystem(job), splits);
return splits.length;
}


JobSubmitter
(1) 在抽象类 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ), ( 这里 FileInputFormat 有两个相同的, 分别是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 和 org.apache.hadoop.mapred.FileInputFormat , 我们选择org.apache.hadoop.mapreduce.lib.input.FileInputFormat 有以下几点原因: 首先, org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类; 其次, WordCount中的FileInputFormat 就是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.)

我们介绍一个概念, 即新旧 MapReduce API , 从0.20.0版本开始, Hadoop 同时提供了新旧两套 MapReduce API. 新 API 在旧 API 基础上进行了封装,使得其在扩展性和易用性方面更好. 旧版 API 放在 org.apache.hadoop.mapred 包中, 而新版 API 则放在 org.apache.hadoop.mapreduce 包及其子包中.

// 该段代码是在 org.apache.hadoop.mapreduce.InputFormat 中

/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;


在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ) 方法中,

(2) 在抽象类 org.apache.hadoop.mapred.InputFormat.getSplites(JobContext ,int ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapred.FileInputFormat.getSplites(JobContext ,int ) , 这里 org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类. 具体参考 FileInputFormat类中split切分算法和host选择算法介绍 .
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: