您的位置:首页 > 运维架构

Hadoop源码解析-作业执行流程-本地模式

2011-12-02 17:12 579 查看
执行一个作业有很多方法,这看两种方法:

1. JobClient.runJob(conf); //JobConf

2. job.waitForCompletion(true);//Job

runJob接受一个作业配置对象JobConf,然后初始化一个JobClient,作业的提交最终有依靠该对象。

public static RunningJob runJob(JobConf job) throws IOException {

JobClient jc = new JobClient(job);

RunningJob rj = jc.submitJob(job);

try {

if (!jc.monitorAndPrintJob(job, rj)) {

throw new IOException("Job failed!");

}

} catch (InterruptedException ie) {

Thread.currentThread().interrupt();

}

return rj;

}

submitJob函数直接通过内部接口submitJobInternal进行作业的提交。第二种方法一样会调用该方法。

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

IOException {

try {

return submitJobInternal(job);

} catch (InterruptedException ie) {

throw new IOException("interrupted", ie);

} catch (ClassNotFoundException cnfe) {

throw new IOException("class not found", cnfe);

}

}

waitForCompletion函数的参数表示是否一直打印作业运行的log信息,默认都是开启的。

public boolean waitForCompletion(boolean verbose

) throws IOException, InterruptedException,

ClassNotFoundException {

if (state == JobState.DEFINE) {

submit();

}

if (verbose) {

jobClient.monitorAndPrintJob(conf, info);

} else {

info.waitForCompletion();

}

return isSuccessful();

}

可以开到两种方法都会调用monitorAndPrintJob,该函数定时参看Hadoop的状态并输出log信息。

/**

* Monitor a job and print status in real-time as progress is made and tasks

* fail.

* @param conf the job's configuration

* @param job the job to track

* @return true if the job succeeded

* @throws IOException if communication to the JobTracker fails

*/

public boolean monitorAndPrintJob(JobConf conf, RunningJob job) throws IOException, InterruptedException;

submitJobInternal函数是向系统提交作业,主要步骤包括配置运行环境、验证输出、将文件分片、分发文件,最后才做实际上的作业提交操作。

/**

* Internal method for submitting jobs to the system.

* @param job the configuration to submit

* @return a proxy object for the running job

* @throws FileNotFoundException

* @throws ClassNotFoundException

* @throws InterruptedException

* @throws IOException

*/

public

RunningJob submitJobInternal(JobConf job

) throws FileNotFoundException,

ClassNotFoundException,

InterruptedException,

IOException {

/*

* configure the command line options correctly on the submitting dfs

*/

JobID jobId = jobSubmitClient.getNewJobId();

Path submitJobDir = new Path(getSystemDir(), jobId.toString());

Path submitJarFile = new Path(submitJobDir, "job.jar");

Path submitSplitFile = new Path(submitJobDir, "job.split");

configureCommandLineOptions(job, submitJobDir, submitJarFile);

Path submitJobFile = new Path(submitJobDir, "job.xml");

int reduces = job.getNumReduceTasks();

JobContext context = new JobContext(job, jobId);

// Check the output specification

if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {

org.apache.hadoop.mapreduce.OutputFormat<?,?> output =

ReflectionUtils.newInstance(context.getOutputFormatClass(), job);

output.checkOutputSpecs(context);

} else {

job.getOutputFormat().checkOutputSpecs(fs, job);

}

// Create the splits for the job

LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

int maps;

if (job.getUseNewMapper()) {

maps = writeNewSplits(context, submitSplitFile);

} else {

maps = writeOldSplits(job, submitSplitFile);

}

job.set("mapred.job.split.file", submitSplitFile.toString());

job.setNumMapTasks(maps);

// Write job file to JobTracker's fs

FSDataOutputStream out =

FileSystem.create(fs, submitJobFile,

new FsPermission(JOB_FILE_PERMISSION));

try {

job.writeXml(out);

} finally {

out.close();

}

//

// Now, actually submit the job (using the submit name)

//

JobStatus status = jobSubmitClient.submitJob(jobId);

if (status != null) {

return new NetworkedJob(status);

} else {

throw new IOException("Could not launch job");

}

}

jobSubmitClient对象的类型是JobSubmissionProtocol,它是一个接口,用以JobClient和JobTracker之间进行通信。JobClient可以利用类提供的方法来提交一个作业,也可以获取当前系统的信息。LocalJobRunner和JobTracker提供了实现。jobSubmitClient在JobClient初始化函数init中赋值。如果当前是local模式,用LocalJobRunner。否则为集群模式,创建一个RPC代理。详细的暂不讨论。

/**

* Connect to the default {@link JobTracker}.

* @param conf the job configuration.

* @throws IOException

*/

public void init(JobConf conf) throws IOException {

String tracker = conf.get("mapred.job.tracker", "local");

if ("local".equals(tracker)) {

this.jobSubmitClient = new LocalJobRunner(conf);

} else {

this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);

}

}

Local方式调用LocalJobRunner的

public JobStatus submitJob(JobID jobid) throws IOException {

return new Job(jobid, this.conf).status;

}

否则调用JobTracker的

/**

* JobTracker.submitJob() kicks off a new job.

*

* Create a 'JobInProgress' object, which contains both JobProfile

* and JobStatus. Those two sub-objects are sometimes shipped outside

* of the JobTracker. But JobInProgress adds info that's useful for

* the JobTracker alone.

*/

public synchronized JobStatus submitJob(JobID jobId) throws IOException {

if(jobs.containsKey(jobId)) {

//job already running, don't start twice

return jobs.get(jobId).getStatus();

}

JobInProgress job = new JobInProgress(jobId, this, this.conf);

String queue = job.getProfile().getQueueName();

if(!(queueManager.getQueues().contains(queue))) {

new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));

throw new IOException("Queue \"" + queue + "\" does not exist");

}

// check for access

try {

checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);

} catch (IOException ioe) {

LOG.warn("Access denied for user " + job.getJobConf().getUser()

+ ". Ignoring job " + jobId, ioe);

new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));

throw ioe;

}

// Check the job if it cannot run in the cluster because of invalid memory

// requirements.

try {

checkMemoryRequirements(job);

} catch (IOException ioe) {

new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));

throw ioe;

}

return addJob(jobId, job);

}

现在只是考虑Local方式下作业的执行。作业运行的核心是Job对象,该对象继承自Thread。Job是LocalJobRunner的一个私有类。LocalJobRunner实现MapReduce的本地模式。

public Job(JobID jobid, JobConf conf) throws IOException {

this.file = new Path(getSystemDir(), jobid + "/job.xml");

this.id = jobid;

this.mapoutputFile = new MapOutputFile(jobid);

this.mapoutputFile.setConf(conf);

this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");

this.localFs = FileSystem.getLocal(conf);

fs.copyToLocalFile(file, localFile);

this.job = new JobConf(localFile);

profile = new JobProfile(job.getUser(), id, file.toString(),

"http://localhost:8080/", job.getJobName());

status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);

jobs.put(id, this);

this.start();

}

创建Job后线程开始执行,执行时先将作业分为任务MapTask做Map操作,然后得到ReduceTask做Reduce操作。详细见Job的run函数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: