您的位置:首页 > 其它

基于【八股文】格式编写WordCount程序

2016-11-01 21:59 330 查看
项目配置

将配置文件拷贝到项目中去:
/opt/tools/workspace/bigdata-hdfs/src/main/reources


cp /opt/modules/hadoop-2.5.0/etc/hadoop/core-site.xml /opt/tools/workspace/bigdata-hdfs/src/main/reources

cp /opt/modules/hadoop-2.5.0/etc/hadoop/hdfs-site.xml /opt/tools/workspace/bigdata-hdfs/src/main/reources

cp /opt/modules/hadoop-2.5.0/etc/hadoop/log4j.properties /opt/tools/workspace/bigdata-hdfs/src/main/reources




创建了一个输入目录
bin/hdfs dfs -mkdir input




将测试数据上传到hdfs目录中去:
bin/hdfs dfs -put /opt/datas/wc.input /user/beifeng/input


编写代码

package com.ibeifeng.bigdata.senior.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMapReduce {

// step 1 : Mapper Class
public static class WordCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {

// 输出单词
private Text mapOutputKey = new Text();
// 出现一次就记作一次
private IntWritable mapOutputValue = new IntWritable(1);

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

System.out.println("map-in-0-key: " + key.get() + " -- "
+ "map-in-value: " + value.toString());
// line value
// 获取文件每一行的<key,value>
String lineValue = value.toString();

// split
// 分割单词,以空格分割
String[] strs = lineValue.split(" ");

// iterator
// 将数组里面的每一个单词拿出来,一个个组成<key,value>
// 生成1
for (String str : strs) {
// set map output key
// 设置key
mapOutputKey.set(str);

// output
// 最终输出
context.write(mapOutputKey, mapOutputValue);
}
}

}

// step 2 : Reducer Class
public static class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outputValue = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// temp : sum
// 定义一个临时变量
int sum = 0;

// iterator
// 对于迭代器中的值进行迭代累加,最后sum加完以后就是统计的次数
for (IntWritable value : values) {
// total
sum += value.get();
}

// set output value
outputValue.set(sum);

// output
context.write(key, outputValue);
}

}

// step 3 : Driver
public int run(String[] args) throws Exception {

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration, this.getClass()
.getSimpleName());
job.setJarByClass(WordCountMapReduce.class);

// set job
// input
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);

// output
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

// Mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// Reducer
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// submit job
boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0 : 1;

}

public static void main(String[] args) throws Exception {

// 传递两个参数,设置路径
args = new String[] {
// 参数1:输入路径
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/input",
// 参数2:输出路径
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/output3" };

// run job
int status = new WordCountMapReduce().run(args);

System.exit(status);
}
}


打jar包







在yarn上运行

命令:
bin/yarn jar jars/mr-wc.jar /user/beifeng/input /user/beifeng/output3




以WordCount程序为例,理解MapReduce如何并行分析数据

使用三个Map任务并行读取三行文件中的内容,对读取的单词进行map操作,每个单词都以
<key, value>
形式生成

input:

hadoop mapreduce
hadoop yarn
hadoop hdfs


Map output:

<hadoop,1>
<mapreduce,1>

<hadoop,1>
<yarn,1>

<hadoop,1>
<hdfs,1>


Reduce操作是对Map的结果进行排序、合并等操作最后得出词频

Sort:

<hadoop,1>
<hadoop,1>
<hadoop,1>
<hdfs,1>
<mapreduce,1>
<yarn,1>


Combiner:

<hadoop, list(1,1,1)>
<mapreduce, list(1)>
<yarn, list(1)>
<hdfs, list(1)>


Reduce output:

<hadoop,3>
<hdfs,1>
<mapreduce,1>
<yarn,1>


MapReduce源码分析的两篇文章

http://blog.csdn.net/recommender_system/article/details/42029311

http://www.tuicool.com/articles/v6VNza
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: