Mapreduce 任务提交源码分析1
2017-12-27 10:59
483 查看
提交过程
一般我们mapreduce任务是通过如下命令进行提交的$HADOOP_HOME/bin/hadoop jar $MR_JAR $MAIN_CLASS
hadoop脚本中有如下代码
elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hadoop.util.RunJar //... 略 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
可以看到hadoop命令提交mapreduce其实就是执行了org.apache.hadoop.util.RunJar类的main方法,接下来我们来看下这个main方法,只关注最核心的逻辑,其他不重要的部分略去。
public static void main(String[] args) throws Throwable { String usage = "RunJar jarFile [mainClass] args..."; // 第一个参数是jar文件路径,第二个参数是主类名(可选),后续跟其他参数 // ... int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); // 构建jar文件对象 // ... // --: 这部分逻辑是获取主类名 // 优先从jar文件的Manifest信息中获取主类名; (只有当打包jar时采用可运行的jar文件的方式才有这个信息,否则普通的jar文件中不包含该信息) // 如果无法获取到,则采用第二参数值作为主类名; // --------------------------------------------------------------------------- String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch(IOException io) { throw new IOException("Error opening job jar: " + fileName) .initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } mainClassName = mainClassName.replaceAll("/", "."); // --------------------------------------------------------------------------- // 获取Hadoop临时目录,指的是本地操作系统的一个目录 // hadoop.tmp.dir配置可在core-site.xml配置文件中配置 File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); ensureDirectory(tmpDir); // --:为这个任务在临时目录下面创建一个临时的工作目录,目录名的格式为:“hadoop-unjar + Long型随机数” //------------------------------------------------------------------------------ final File workDir; try { workDir = File.createTempFile("hadoop-unjar", "", tmpDir); } catch (IOException ioe) { System.err.println("Error creating temp dir in hadoop.tmp.dir " + tmpDir + " due to " + ioe.getMessage()); System.exit(-1); return; } if (!workDir.delete()) { System.err.println("Delete failed for " + workDir); System.exit(-1); } ensureDirectory(workDir); //------------------------------------------------------------------------------ // 为Java进程添加一个钩子程序,当程序shutdown时清楚临时工作目录 ShutdownHookManager.get().addShutdownHook( new Runnable() { @Override public void run() { FileUtil.fullyDelete(workDir); } }, SHUTDOWN_HOOK_PRIORITY); // 将jar文件里面的内容解压到这个临时的工作目录 unJar(file, workDir); // -- 将: // workDir/, workDir/classes/, workDir/lib/${allfiles} 添加到classpath //------------------------------------------------------------------------------ ArrayList<URL> classPath = new ArrayList<URL>(); classPath.add(new File(workDir+"/").toURI().toURL()); classPath.add(file.toURI().toURL()); classPath.add(new File(workDir, "classes/").toURI().toURL()); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.add(libs[i].toURI().toURL()); } } //------------------------------------------------------------------------------ // --: 以上面的classpath创建一个URLClassLoader,作为主线程的classLoader ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); // --:通过反射的机制去调用主类的main方法,并将除jar文件和[mainClass]以外的所有参数传递给该main方法 Class<?> mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args) .subList(firstArg, args.length).toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } }
总结
hadoop命令做的事情就是将jar文件的内容解压到临时工作目录,并将解压后的workDir/, workDir/classes/, workDir/lib/${allfiles} 一系列路径加入到自定义的ClassLoader中,并通过反射的机制去执行jar文件中Manifest中的主类或是用户指定的主类。相关文章推荐
- MapReduce执行过程源码分析(一)——Job任务的提交
- Spark1.3从创建到提交:10)任务提交源码分析
- MapReduce作业提交源码分析
- Hadoop2.*源码分析之Job任务提交与执行
- Yarn任务提交流程(源码分析)
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Spark2.2源码之Task任务提交源码分析
- jstorm源码分析:提交任务过程
- spark core源码分析1 集群启动及任务提交过程
- mapreduce源码分析作业提交、初始化、分配、计算过程之提交篇
- MapReduce的MapTask任务的运行源码级分析
- spark源码分析之Executor启动与任务提交篇
- MapReduce的ReduceTask任务的运行源码级分析
- MapReduce Job本地提交过程源码跟踪及分析
- mapreduce提交job源码分析
- mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇
- Hive生成MapReduce任务源码分析
- mapreduce源码分析之Reduce任务的运行
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- MapReduce源码分析之新API作业提交(二):连接集群