Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
1、提交Job
MapReduce程序框架请参考
https://www.geek-share.com/detail/2707064106.html
@Override public int run(String[] args) throws Exception { //读取配置文件 Configuration conf = getConf(); //设置参数 conf.set("fs.defaultFS", "hdfs://192.168.11.81:9000"); //自定义key value 之间的分隔符(默认为tab) conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); // Job表示一个MapReduce任务,构造器第二个参数为Job的名称。 Job job = Job.getInstance(conf, "MapReduceTemplate"); job.setJarByClass(MapReduceTemplate.class);//主类 Path in = new Path(args[0]);//输入路径 Path out = new Path(args[1]);//输出路径 FileSystem hdfs = out.getFileSystem(conf); if (hdfs.isDirectory(out)) {//如果输出路径存在就删除 hdfs.delete(out, true); } FileInputFormat.setInputPaths(job, in);//文件输入 FileOutputFormat.setOutputPath(job, out);//文件输出 job.setMapperClass(MapTemplate.class); //设置自定义Mapper job.setReducerClass(ReduceTemplate.class); //设置自定义Reducer job.setInputFormatClass(KeyValueTextInputFormat.class);//文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//文件输出格式 job.setOutputKeyClass(Text.class);//设置作业输出值 Key 的类 job.setOutputValueClass(Text.class);//设置作业输出值 Value 的类 return job.waitForCompletion(true)?0:1;//等待作业完成退出 }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
2、进入Job.waitForCompletion()方法
在判断状态state可以提交Job后,执行submit()方法。monitorAndPrintJob()方法会不断的刷新获取job运行的进度信息,并打印。boolean参数verbose为true表明要打印运行进度,为false就只是等待job运行结束,不打印运行日志。
/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { //当state为define时,则提交 if (state == JobState.DEFINE) { submit(); //提交过程在submit()方法中 } //verbose是waitForCompletion(verbose)传进来的true,表示执行monitorAndPrintJob() //检测并打印Job相关信息 if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. //从客户端获得完成轮询时间间隔 int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } //返回一个boolean值,表示作业是否成功提交 return isSuccessful(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
3、进入submit()方法
submit方法首先是确保当前的Job的状态是处于DEFINE,否则不能提交Job。然后启用新的API,即org.apache.hadoop.mapreduce下的Mapper和Reducer。Connect()方法会产生一个Client实例,用来和ResourceManager通信。其实submit()方法里关键的两处代码,一处是调用connect()方法,另一处是获取一个JobSubmitter类的实例,调用该对象的submitJobInternal方法来提交任务。
/** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { //再次检查作业的状态 ensureState(JobState.DEFINE); //两套API,这里使用新API setUseNewAPI(); connect(); //初始化工作,为cluster赋值,Client即是提交器,分为本体提交器和Yarn提交器,由配置文件决定 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster);//提交 } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
4、首先看connect()方法
MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,它实际上是构造集群Cluster实例cluster
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) {//若cluster空,则构造Cluster实例 //cluster是连接MapReduce集群的工具,提供了远程获取MapReduce集群的方法 cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
5、进入return的Cluster(getConfiguration())构造方法
来到了Cluster类。先来看下Cluster类的成员信息。
package org.apache.hadoop.mapreduce; import ... /** * Provides a way to access information about the map/reduce cluster. */ @InterfaceAudience.Public @InterfaceStability.Evolving public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; private ClientProtocolProvider clientProtocolProvider; //客户端通信协议提供者 private ClientProtocol client; //客户端通信协议实例 private UserGroupInformation ugi; //用户组信息 private Configuration conf; //配置信息 private FileSystem fs = null; //文件系统实例 private Path sysDir = null; //系统路径 private Path stagingAreaDir = null; //作业资源存放路径 private Path jobHistoryDir = null; //作业历史路径 private static final Log LOG = LogFactory.getLog(Cluster.class); //日志 //客户端通信协议提供者加载器 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); } public Cluster(Configuration conf) throws IOException { this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; //设置配置信息 this.ugi = UserGroupInformation.getCurrentUser(); //获取当前用户 initialize(jobTrackAddr, conf); //完成初始化 } ... }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider和客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。 Cluster类提供了两个构造函数。
6、进入initialize()方法
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //依次取出每个ClientProtocolProvider,通过其create()方法构造ClientProtocol实例 for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { //如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行 //如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行 if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } //设置成员变量clientProtocolProvider和client,并退出循环 if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; }else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } }catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } //如果clientProtocolProvider或client空,抛出IOException if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
7、ClientProtocolProvider实现类LocalClientProtocolProvider
上面create()方法时提到了两种ClientProtocolProvider实现类。
MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。
我们先看下看下Local模式,LocalClientProtocolProvider的create()方法,代码如下
package org.apache.hadoop.mapred; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; @InterfaceAudience.Private public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { //两个常量:"mapreduce.framework.name","local" String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); //若framework是local,则返回LocalJobRunner,并且设置Map任务数1;否则返回null if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); return new LocalJobRunner(conf); } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket } @Override public void close(ClientProtocol clientProtocol) { // no clean up required } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
由上可知,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。
8、ClientProtocolProvider实现类YarnClientProtocolProvider
再来看Yarn模式,看下YarnClientProtocolProvider的create()方法
package org.apache.hadoop.mapred; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { //若参数mapreduce.framework.name配置为Yarn,则构造一个YARNRunner实例并返回,否则返回null if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); } return null; } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。
9、YARNRunner
以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现。
最重要的一个变量就是ResourceManager的代理ResourceMgrDelegate类型的resMgrDelegate实例,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。以后有时间还要详细介绍这个对象。另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache。
package org.apache.hadoop.mapred; import ... /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ @SuppressWarnings("unchecked") public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); //记录工厂RecordFactory实例 private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private ResourceMgrDelegate resMgrDelegate; //ResourceManager代理实例对象 private ClientCache clientCache; //客户端缓存实例 private Configuration conf; //配置信息 private final FileContext defaultFileContext; //文件上下文实例 /** * Yarn runner incapsulates the client interface of * yarn * @param conf the configuration object for the client */ //先构造ResourceManager代理ResourceMgrDelegate实例,再调用两个参数的YARNRunner构造器 public YARNRunner(Configuration conf) { this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); } /** * Similar to {@link #YARNRunner(Configuration)} but allowing injecting * {@link ResourceMgrDelegate}. Enables mocking and testing. * @param conf the configuration object for the client * @param resMgrDelegate the resourcemanager client handle. */ //先构造客户端缓存ClientCache实例,再调用三个参数的YARNRunner构造器 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); } /** * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} * but allowing injecting {@link ClientCache}. Enable mocking and testing. * @param conf the configuration object * @param resMgrDelegate the resource manager delegate * @param clientCache the client cache object. */ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { this.conf = conf; //成员变量赋值 try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; //获取文件上下文defaultFileContext实例 this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } } ... }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。
10、connect()方法总结
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。
11、submitJobInternal()方法
再次回到(3、进入submit()方法)submit()方法,上面已经介绍了connect()方法,下面开始介绍另一个重要的的方法submitJobInternal()。
该方法隶属于JobSubmitter类,顾名思义,该类是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。
JobSubmitter的类一共有四个类成员变量,分别为:
1. 文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;
2. 客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等,上文已经介绍过了。
3. 提交作业的主机名submitHostName;
4. 提交作业的主机地址submitHostAddress。
@InterfaceAudience.Private @InterfaceStability.Unstable class JobSubmitter { protected static final Log LOG = LogFactory.getLog(JobSubmitter.class); private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; private static final int SHUFFLE_KEY_LENGTH = 64; private FileSystem jtFs; //文件系统FileSystem实例 private ClientProtocol submitClient; //客户端通信协议ClientProtocol实例 private String submitHostName; //提交作业的主机名 private String submitHostAddress; //提交作业的主机 //构造器 JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException { this.submitClient = submitClient; this.jtFs = submitFs; } ... }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
接下里是最重要的代码——JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群
/** * Internal method for submitting jobs to the system. * * <p>The job submission process involves: * <ol> * <li> * Checking the input and output specifications of the job. * </li> * <li> * Computing the {@link InputSplit}s for the job. * </li> * <li> * Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary. * </li> * <li> * Copying the job's jar and configuration to the map-reduce system * directory on the distributed file-system. * </li> * <li> * Submitting the job to the <code>JobTracker</code> and optionally * monitoring it's status. * </li> * </ol></p> * @param job the configuration to submit * @param cluster the handle to the Cluster * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs 验证作业输出规格 //检查作业输出路径是否配置并且是否存在。正确情况是已经配置且不存在 //输出路径的配置参数为mapreduce.output.fileoutputformat.outputdir checkSpecs(job); //获取配置信息 Configuration conf = job.getConfiguration(); //添加应用框架路径到分布式缓存中 addMRFrameworkToDistributedCache(conf); //通过静态方法getStagingDir()获取作业执行时相关资源的存放路径 //参数未配置时默认是/tmp/hadoop-yarn/staging/提交作业用户名/.staging Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs //在提交dfs上正确配置命令行选项 InetAddress ip = InetAddress.getLocalHost();//获取当前主机IP if (ip != null) {//记录提交作业的主机IP、主机名,并且设置配置信息conf submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID();//生成作业ID,即是jobID job.setJobID(jobId);//将jobID设置入job //构造提交作业路径,jobStagingArea后接/jobID Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try {//设置一些作业参数 conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir 获得路径的授权令牌 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); //获取秘钥和令牌,并将它们存储到令牌缓存TokenCache中 populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers 生成一个秘密来验证洗牌转移 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } //复制并配置相关文件 copyAndConfigureFiles(job, submitJobDir); //获取配置文件路径 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job 创建作业的拆分 LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //调用writeSplits()方法,写分片数据文件job.splits和分片元数据文件job.splitmetainfo,计算map任务数 int maps = writeSplits(job, submitJobDir); //设置map数 conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file.获取作业队列名queue,取参数mapreduce.job.queuename,默认值为default String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf);//清除缓存的令牌 //根据参数确定是否需要追踪令牌ID if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids 通过job获取令牌id,并且缓存到trackingIds列表中 ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists 设置保留信息,如果它存在 ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir 写作业文件提交目录 writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // 现在,实际提交作业(使用提交名称) printTokens(jobId, job.getCredentials()); //通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业 //并获取作业状态实例status。由上下文可知,此处的submitClient是YARNRunner或LocalJobRunner status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) {//作业状态不空,直接返回,否则抛出IOException return status; } else { throw new IOException("Could not launch job"); } } finally {//抛出无法加载作业的IOException前,调用文件系统FileSystem实例jtFs的delete()方法, //删除作业提交的相关资源目录或者文件submitJobDir if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
至此,MapReduce的Job提交的大体过程就分析完毕!
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- hadoop作业提交和MapReduce基础分析
- Hadoop MapReduce作业提交与初始化过程分析
- hadoop作业提交源码分析
- MapReduce作业提交源码分析
- MapReduce源码分析之新API作业提交(二):连接集群
- (大数据之MapReduce) Hadoop作业提交分析(四)
- Hadoop-2.4.1源码分析--MapReduce作业切片(Split)过程
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- MapReduce作业提交源码分析
- Hadoop作业提交源码分析
- MapReduce Job本地提交过程源码跟踪及分析
- Hadoop调度源码分析 作业提交到完成初始化部分
- mapreduce源码分析作业提交、初始化、分配、计算过程之提交篇
- hadoop1.x作业提交过程分析(源码分析第二篇) 推荐
- hadoop提交作业------>yarn提交job的源码跟踪
- mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇
- Hadoop(十二):从源码角度分析Hadoo是如何将作业提交给集群的