您的位置:首页 > 移动开发 > 微信开发

对hadoop第一个小程序WordCount的简单解释.

2013-06-17 17:25 513 查看
package com.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

/**
*
* 描述:需要的jar包在你下载hadoop里都有
* 输入目录为:/home/dev/hadooptest/in
* 输出目录为:/home/dev/hadooptest/in/out   注:out目录自动创建,如果存在请删除
* 文件个数2个,为:file1  file2
* file1文件内容:Hello World Bye World
* file2文件内容:Hello Hadoop Goodbye Hadoop
对Map Reduce简单的分析一下:
***-----------------------------------------***
对于文件file2
***---Map统计第一次出现的每个单词---***
Map--->键(文本):->Hello   值(个数):->1
Map--->键(文本):->Hadoop   值(个数):->1
Map--->键(文本):->Goodbye   值(个数):->1
Map--->键(文本):->Hadoop   值(个数):->1

***---Reduce 合并Map统计第一次出现的每个单词重复的次数
Reduce--->键(文本):->Goodbye   值(个数):->1
Reduce--->键(文本):->Hadoop   值(个数):->2
Reduce--->键(文本):->Hello   值(个数):->1

***-----------------------------------------***
对于文件file1
***---Map统计第一次出现的每个单词---***
Map--->键(文本):->Hello   值(个数):->1
Map--->键(文本):->World   值(个数):->1
Map--->键(文本):->Bye   值(个数):->1
Map--->键(文本):->World   值(个数):->1

***---Reduce 合并Map统计第一次出现的每个单词重复的次数---***
Reduce--->键(文本):->Bye   值(个数):->1
Reduce--->键(文本):->Hello   值(个数):->1
Reduce--->键(文本):->World   值(个数):->2

***---最后  Reduce 合并最终的结果---***
Reduce--->键(文本):->Bye   值(个数):->1
Reduce--->键(文本):->Goodbye   值(个数):->1
Reduce--->键(文本):->Hadoop   值(个数):->2
Reduce--->键(文本):->Hello   值(个数):->2
Reduce--->键(文本):->World   值(个数):->2
*/
public class WordCount {

public static final String inputPath = "/home/dev/hadooptest/in";
public static final String outputPath = "/home/dev/hadooptest/in/out";

/**
* MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情) Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。
* Reporter 则可用于报告整个应用的运行进度,本例中未使用。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java
* 数据类型的类,这些类实现了WritableComparable接口,
* 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

/**
* Mapper接口中的map方法: void map(K1 key, V1 value, OutputCollector<K2,V2>
* output, Reporter reporter) 映射一个单个的输入k/v对到一个中间的k/v对
* 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<k,v>键值对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)键值对到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
// 设置分割的字符放到
word.set(tokenizer.nextToken());
/**
* OutputCollector接口:收集Mapper和Reducer输出的<k,v>键值对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)键值对到output
*/
System.out.println("Map--->键(文本):->"+String.valueOf(word)+"   "+"值(个数):->"+one);
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
System.out.println("Reduce--->键(文本):->"+String.valueOf(key)+"   "+"值(个数):->"+sum);
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {

/**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration
* conf)等
*/

JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); // 设置一个用户定义的job名称

conf.setOutputKeyClass(Text.class); // 为job的输出数据设置Key类
conf.setOutputValueClass(IntWritable.class); // 为job输出设置value类

conf.setMapperClass(Map.class); // 为job设置Mapper类
conf.setCombinerClass(Reduce.class); // 为job设置Combiner类
conf.setReducerClass(Reduce.class); // 为job设置Reduce类

conf.setInputFormat(TextInputFormat.class); // 为map-reduce任务设置InputFormat实现类
conf.setOutputFormat(TextOutputFormat.class); // 为map-reduce任务设置OutputFormat实现类

/**
* InputFormat描述map-reduce中对job的输入定义 setInputPaths():为map-reduce
* job设置路径数组作为输入列表 setInputPath():为map-reduce job设置路径数组作为输出列表
*/
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));

JobClient.runJob(conf); // 运行一个job

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: