您的位置:首页 > 运维架构

Hadoop 6、第一个mapreduce程序 WordCount

2016-02-27 21:55 537 查看
[b]1、程序代码[/b]

Map:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), ' ');
for(String word : words){
context.write(new Text(word), new IntWritable(1));
}
}
}


Reduce:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i : arg1){
sum += i.get();
}
arg2.write(arg0, new IntWritable(sum));
}

}


Main:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

public static void main(String[] args) {
Configuration config = new Configuration();
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("wordCount");
job.setJarByClass(RunJob.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(wordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path("/usr/input/"));
Path outPath = new Path("/usr/output/wc/");
if(fs.exists(outPath)){
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
Boolean result = job.waitForCompletion(true);
if(result){
System.out.println("Job is complete!");
}else{
System.out.println("Job is fail!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}


[b]2、打包程序[/b]

将Java程序打成Jar包,并上传到Hadoop服务器上(任何一台在启动的NameNode节点即可)



[b]3、数据源[/b]

数据源是如下:

hadoop java text hdfs
tom jack java text
job hadoop abc lusi
hdfs tom text


将该内容放到txt文件中,并放到HDFS的/usr/input(是HDFS下不是Linux下),可以使用Eclipse插件上传:



[b]4、执行Jar包[/b]

# hadoop jar jar路径  类的全限定名(Hadoop需要配置环境变量)
$ hadoop jar wc.jar com.raphael.wc.RunJob


执行完成以后会在HDFS的/usr下新创建一个output目录:



查看执行结果:

abc	1
hadoop	2
hdfs	2
jack	1
java	2
job	1
lusi	1
text	3
tom	2


完成了单词个数的统计。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: