您的位置:首页 > 运维架构

Hadoop源码分析1: 客户端提交JOB

2014-05-28 08:47 381 查看
1. 测试wordcount,其源码如下:

public class WordCount {

public static classTokenizerMapper
extends Mapper{
............
}
}

public static classIntSumReducer
extends Reducer{
...............
}
}

public static void main(String[]args) throws Exception {
Configurationconf = new Configuration();
String[] otherArgs = newGenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length !=2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = newJob(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, newPath(otherArgs[0]));
FileOutputFormat.setOutputPath(job, newPath(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

2. 修改 bin/hadoop , 设置Debug参数

最后一行:

exec "$JAVA" -Xdebug-Xrunjdwp:transport=dt_socket,address=8888,server=y,suspend=y -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS-classpath "$CLASSPATH" $CLASS "$@"

执行命令:
hadoop jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jarwordcount in out2

实际上运行:
exec /opt/jdk1.7.0_07/bin/java -Xdebug-Xrunjdwp:transport=dt_socket,address=8888,server=y,suspend=y -Dproc_jar -Xmx1000m -Dhadoop.log.dir=/opt/hadoop-1.0.0/libexec/../logs-Dhadoop.log.file=hadoop.log-Dhadoop.home.dir=/opt/hadoop-1.0.0/libexec/..
-Dhadoop.id.str=-Dhadoop.root.logger=INFO,console-Dhadoop.security.logger=INFO,NullAppender-Djava.library.path=/opt/hadoop-1.0.0/libexec/../lib/native/Linux-amd64-64-Dhadoop.policy.file=hadoop-policy.xml -classpath/opt/hadoop-1.0.0/libexec/../conf:/opt/jdk1.7.0_07/lib/tools.jar:/opt/hadoop-1.0.0/libexec/..:/opt/hadoop-1.0.0/libexec/../hadoop-core-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/asm-3.2.jar:/opt/hadoop-1.0.0/libexec/../lib/aspectjrt-1.6.5.jar:/opt/hadoop-1.0.0/libexec/../lib/aspectjtools-1.6.5.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-beanutils-1.7.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-beanutils-core-1.8.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-cli-1.2.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-codec-1.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-collections-3.2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-configuration-1.6.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-daemon-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-digester-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-el-1.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-httpclient-3.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-lang-2.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-logging-1.1.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-logging-api-1.0.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-math-2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-net-1.4.1.jar:/opt/hadoop-1.0.0/libexec/../lib/core-3.1.1.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-capacity-scheduler-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-fairscheduler-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-thriftfs-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hsqldb-1.8.0.10.jar:/opt/hadoop-1.0.0/libexec/../lib/jackson-core-asl-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jackson-mapper-asl-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jasper-compiler-5.5.12.jar:/opt/hadoop-1.0.0/libexec/../lib/jasper-runtime-5.5.12.jar:/opt/hadoop-1.0.0/libexec/../lib/jdeb-0.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-core-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-json-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-server-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jets3t-0.6.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jetty-6.1.26.jar:/opt/hadoop-1.0.0/libexec/../lib/jetty-util-6.1.26.jar:/opt/hadoop-1.0.0/libexec/../lib/jsch-0.1.42.jar:/opt/hadoop-1.0.0/libexec/../lib/junit-4.5.jar:/opt/hadoop-1.0.0/libexec/../lib/kfs-0.2.2.jar:/opt/hadoop-1.0.0/libexec/../lib/log4j-1.2.15.jar:/opt/hadoop-1.0.0/libexec/../lib/mockito-all-1.8.5.jar:/opt/hadoop-1.0.0/libexec/../lib/oro-2.0.8.jar:/opt/hadoop-1.0.0/libexec/../lib/servlet-api-2.5-20081211.jar:/opt/hadoop-1.0.0/libexec/../lib/slf4j-api-1.4.3.jar:/opt/hadoop-1.0.0/libexec/../lib/slf4j-log4j12-1.4.3.jar:/opt/hadoop-1.0.0/libexec/../lib/xmlenc-0.52.jar:/opt/hadoop-1.0.0/libexec/../lib/jsp-2.1/jsp-2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jsp-2.1/jsp-api-2.1.jarorg.apache.hadoop.util.RunJar
/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount inout

在Eclipse中准备好源码,配置好环境,连接端口8888进行调试。

3.提交过程

(1).入口函数 org.apache.hadoop.util.RunJar.main(String[]args)

public static void main(String[] args)throws Throwable {
String usage = "RunJarjarFile [mainClass] args...";

if (args.length < 1){
System.err.println(usage);
System.exit(-1);
}

int firstArg = 0;
String fileName =args[firstArg++];
File file = newFile(fileName);
String mainClassName =null;

//加载Jar文件
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch(IOException io){
thrownew IOException("Error opening job jar: " + fileName)
.initCause(io);
}

// 加载Jar文件的入口main函数class
Manifest manifest =jarFile.getManifest();
if (manifest != null){
mainClassName =manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();

if (mainClassName ==null) {
if(args.length < 2) {
System.err.println(usage);
System.exit(-1);
}
mainClassName = args[firstArg++];
}
mainClassName =mainClassName.replaceAll("/", ".");

//根据hadoop.tmp.dir建立临时文件夹
File tmpDir = newFile(new Configuration().get("hadoop.tmp.dir"));
tmpDir.mkdirs();
if(!tmpDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + tmpDir);
System.exit(-1);
}
final File workDir =File.createTempFile("hadoop-unjar", "", tmpDir);
workDir.delete();
workDir.mkdirs();
if(!workDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + workDir);
System.exit(-1);
}

//添加一个删除临时文件夹的hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
FileUtil.fullyDelete(workDir);
} catch (IOException e){
}
}
});

//将Jar文件解压到临时文件夹
unJar(file,workDir);

//将临时文件夹的class和lib加载到classpath
ArrayList classPath =new ArrayList();
classPath.add(newFile(workDir+"/").toURL());
classPath.add(file.toURL());
classPath.add(newFile(workDir, "classes/").toURL());
File[] libs = newFile(workDir, "lib").listFiles();
if (libs != null){
for(int i = 0; i < libs.length; i++) {
classPath.add(libs[i].toURL());
}
}

//加载main函数
ClassLoader loader=
newURLClassLoader(classPath.toArray(new URL[0]));

Thread.currentThread().setContextClassLoader(loader);
Class
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: