您的位置:首页 > 运维架构

Hadoop 研发之远程调试详细剖析--WordCount V2.0

2016-08-20 17:25 561 查看
希望本篇博文能帮助到那些想快速开始Hadoop程序开发调试的朋友们~

前言

之前学习Hadoop时,曾经错误的以为开发的Hadoop程序必须在运行的Hadoop集群上才能运行和调试,基于这个错误认识,花费了较多的时间在mac上搭建伪分布式Hadoop集群和IDE集成开发环境,虽然走了不少弯路,但现在回头想想还是很值得了,至少对Hadoop的认识加深了不少。

之所花费了较多的时间是因为,是因为:

需要自己编译Hadoop native lib,有兴趣可以参考附录1。

需要伪分布式配置Hadoop,其中包括HDFS,MR,YARN配置,配置过程中要解决端口号占用冲突,有兴趣的可以参考附录2。

需要搭建IDE开发环境,有兴趣的可以参看附录3。

随着知识的积累,发现关于Hadoop开发调试的一些认识其实是错误的,譬如:

调试Hadoop程序时,不必要求有运行着的Hadoop集群。其实调试Hadoop程序时,只要mac上有能支撑Hadoop程序运行的基本环境即可,具体要求也既是只要有Hadoop的完整jar包即可。

使用Eclipse或IntelliJ IDEA开发Hadoop程序时,不是必须安装用于创建Hadoop工程的插件的。使用本博文中的方法,在开发Hadoop程序时,只要创建普通的Java project,再在build path中添加Hadoop jar依赖即可。

有了上面的认识,再结合一下Java远程调试就可以非常方便的开发调试Hadoop程序了。

以下还是以Hadoop官网WordCount为例,详细分析Hadoop研发远程调试;以下源码中除了改造部分WordCount源码外,还增加了一个工具类RunJobTool,方便在Terminal下调试Hadoop程序,也因为这些原因,所以称本示例为WordCount V2.0。

本次调试输入的文件内容为:

a c b d
d b c a
a c d b
c a r s
d s g h


一、源码展示

本分源码分为两个部分:

WordCountV2.java

RunJobTool.java

WordCountV2源码

package hadoopTest;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountV2 {
private static boolean debugFlag = false;

public static void setDebug(boolean debugMode){
debugFlag = debugMode;
}

/**
* 定义MR Mapper.
* */
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);

if(debugFlag){
System.out.println("raw data: key= " + key + ", value= " + value + " maper output : " + word + " " + one);
}

}
}
}

/**
* 定义MR Reducer
* */
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
StringBuffer sb = new StringBuffer();
for (IntWritable val : values) {
sum += val.get();
sb.append(val + " ");
}

if(debugFlag){
System.out.println("raw data: key= " + key + ", value= " + sb.toString());
}

result.set(sum);
context.write(key, result);
}
}

/**
* 配置job的详细属性,并提交执行完成。
* */
public static void run(Configuration conf, String jobName, Path inputPath, Path outputPath, boolean debug) throws IOException, ClassNotFoundException, InterruptedException{
Job job = Job.getInstance(conf, jobName);
job.setJobName(jobName);

job.setJarByClass(WordCountV2.class);
WordCountV2.setDebug(debug);

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);

job.submit();
System.exit(job.waitForCompletion(true) ? 0 : 1);

System.out.println("execute success!");
}
}


源码分析:

和官网源码相比:去掉了其中的
main函数
,增加了
run函数
,其实它们的作用基本相同,都完成job创建,mapper/reducer设置,以及job提交等,只是
run函数
可以更好的和类RunJobTool结合使用。

RunJobTool源码

package hadoopTest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.lexicalscope.jewel.cli.CliFactory;
import com.lexicalscope.jewel.cli.Option;

public class RunJobTool extends Configured implements Tool{

/**
* 使用com.lexicalscope.jewel.cli.CliFactory接收和解析命令行参数
* */
public interface MapReduceCommandLineOptions{
@Option(longName = "job-priority", description = "作业优先级", defaultValue = "normal")
String jobPriority();

@Option(longName = "job-queueu", description = "作业队列", defaultValue= "default")
String jobQueue();
}

public interface CommandLineOptions extends MapReduceCommandLineOptions{
@Option(longName = "input-dir")
String inputDir();

@Option(longName = "output-dir")
String outputDir();
}

/**
* 主要用于从命令行解析参数
* */
@Override
public int run(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
CommandLineOptions cli = CliFactory.parseArguments(CommandLineOptions.class, args);
configureMapReduce(conf, cli);

Path inputPath = new Path(cli.inputDir());
Path outputPath = new Path(cli.outputDir());
boolean debug = true;
String jobName = "WordCountV2";

//用于submit job
WordCountV2.run(conf, jobName, inputPath, outputPath, debug);

return 0;
}

/**
* 配置JobTracker属性
* */
public static void configureMapReduce(Configuration conf, MapReduceCommandLineOptions cli) {
// configure JobTracker to local run mode
System.out.println("This job will be running locally.");
conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "file:///");

// configure job priority
JobPriority jobPriority = parseJobPriority(cli.jobPriority());
conf.set("mapreduce.job.priority", jobPriority.name());

// configure job queue
conf.set("mapreduce.job.queuename", cli.jobQueue());
}

/**
* 配置JobTracker属性
* */
private static JobPriority parseJobPriority(String s) {
for (JobPriority p : JobPriority.values()) {
if (p.name().equalsIgnoreCase(s)) {
return p;
}
}
System.out.println("Unknown job priority: " + s + ". Fallback to NORMAL.");
return JobPriority.NORMAL;
}

/**
* Job入口函数
* */
public static void main(String[] args) {
Configuration conf = new Configuration();
RunJobTool stJobTool = new RunJobTool();
try {
//重要:用Hadoop ToolRunner执行job任务。
int exitCode = ToolRunner.run(conf, stJobTool, args);
if(exitCode == 0){
System.out.println("Job Success Completed!");
}else{
System.out.println("Job Failed!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}


源码分析:

源码中使用
com.lexicalscope.jewel.cli.CliFactory
接收和解析Terminal下远程调试Hadoop程序时的参数。

使用Hadoop
ToolRunner.run(Configuration conf, Tool tool, String[] args)
方法执行类
RunJobTool


源码中使用以下代码,使用Hadoop名为local的MapReduce jobs执行框架(重要)。

conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "file:///");


The runtime framework for executing MapReduce jobs Can be one of local, classic or yarn.

二、终端下启动远程debugee

在终端下使用以下命令启动远程debugee:

java -cp `hadoop classpath`:/Users/zq/jars/jewelcli-0.8.6.jar:/Users/zq/Documents/workspace/hadoopLocal/bin/hadoopTest.jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6789 -Djava.library.path=/Users/zq/big_data_workshop/usr_big_data/hadoop/hadoop-2.6.0/lib/native  hadoopTest/RunJobTool --input-dir=/Users/zq/tmp/word.txt --output-dir=/Users/zq/tmp/tmp


该命令分为5个部分:

-cp
:class search path of directories and zip/jar files。

这里需要注意的有两点:

(1)把Hadoop自带jar包添加到
-cp
路径中,这里获得Hadoop自带jar的路径,使用了命令
hadoop classpath


(2)把需要调试的Hadoopx程序打成jar包添加到
-cp
路径中,这里要调试的Hadoop程序jar包具体为
:/Users/zq/Documents/workspace/hadoopLocal/bin/hadoopTest.jar


打jar包只要使用命令
jar -cfv + *.jar + 程序目录
即可。

(3)把用到的ewelcli-0.8.6.jar添加到
-cp
路径中。

-agentlib
:实现Hadoop本地远程调试关键参数,详细分析参见博文Java 远程调试参数说明

-Djava.library.path
:用于加载Hadoop native lib,调试Hadoop程序时,会报警告
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable。


要调试的Hadoop程序入口类,这里为
hadoopTest/RunJobTool


要调试的Hadoop程序所需参数,这里为
--input-dir=/Users/zq/tmp/word.txt --output-dir=/Users/zq/tmp/tmp


三、IDE中启动本地debuger

详细分析过程参见博文Java远程调试

四、远程debugee部分打印结果

map部分打印结果

raw data: key= 0, value= a c b d maper output : a 1
raw data: key= 0, value= a c b d maper output : c 1
raw data: key= 0, value= a c b d maper output : b 1
raw data: key= 0, value= a c b d maper output : d 1
raw data: key= 8, value= d b c a maper output : d 1
raw data: key= 8, value= d b c a maper output : b 1
raw data: key= 8, value= d b c a maper output : c 1
raw data: key= 8, value= d b c a maper output : a 1
raw data: key= 16, value= a c d b maper output : a 1
raw data: key= 16, value= a c d b maper output : c 1
raw data: key= 16, value= a c d b maper output : d 1
raw data: key= 16, value= a c d b maper output : b 1
raw data: key= 24, value= c a r s maper output : c 1
raw data: key= 24, value= c a r s maper output : a 1
raw data: key= 24, value= c a r s maper output : r 1
raw data: key= 24, value= c a r s maper output : s 1
raw data: key= 32, value= d s g h maper output : d 1
raw data: key= 32, value= d s g h maper output : s 1
raw data: key= 32, value= d s g h maper output : g 1
raw data: key= 32, value= d s g h maper output : h 1


reduce部分打印结果

raw data: key= a, value= 1 1 1 1
raw data: key= b, value= 1 1 1
raw data: key= c, value= 1 1 1 1
raw data: key= d, value= 1 1 1 1
raw data: key= g, value= 1
raw data: key= h, value= 1
raw data: key= r, value= 1
raw data: key= s, value= 1 1


raw data: key= a, value= 4
raw data: key= b, value= 3
raw data: key= c, value= 4
raw data: key= d, value= 4
raw data: key= g, value= 1
raw data: key= h, value= 1
raw data: key= r, value= 1
raw data: key= s, value= 2


四、附录

详细编译过程参见博文mac下hadoop 2.6.0编译native library ,资源下载参见链接mac hadoop2.6.0 lib/native

示例配置文件参见链接hadoop2.6及hbase0.96伪分布式安装配置文件

利用Eclipse搭建IDE环境时,需要安装Eclipse插件,并且该插件需要根据Hadoop版本进行编译,然后利用Eclipse集成环境创建Map/Reduce Project,然后进行开发和调试,详细过程参见博文hadoop eclipse 程序调试

so tired ><…终于整理完了,希望能帮助到读到本篇博客的你~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop 调试