hadoop基础----hadoop实战(五)-----myeclipse开发MapReduce---WordCount例子---解析MapReduce的写法
2016-09-19 18:05
495 查看
我们在上一章节已经了解了怎样在myeclipse中开发运行MapReduce
hadoop基础----hadoop实战(四)-----myeclipse开发MapReduce---myeclipse搭建hadoop开发环境并运行wordcount
也在很早的章节中了解了MapReduce的原理
hadoop基础----hadoop理论(四)-----hadoop分布式并行计算模型MapReduce详解
我们本章节来详细学习java代码中,是怎样配置实现MapReduce的。就以WordCount例子为例。
本章节的目的是 熟悉MapReduce的写法之后,我们能写出更多的业务处理,解决更多的其它问题。
Mapper接口的实现,Reducer接口的实现,Job的配置。
Mapper接口和Reducer接口的实现就是要分别编写两个类(例如分别叫做Map类和Reduce类)。
在Map类中规定如何将输入的<key, value>对转化为中间结果的<key, list of values>对。
在Reduce类中规定如何将Map输出的中间结果进一步处理,转化为最终的结果输出<key, value>对。
而对Job的配置是要在main函数中创建相关对象,调用其方法实现的。
file1.txt中是
hello world
file2.txt中是
hello hadoop
这段代码实现了Map的功能,我们声明了一个类TokenizerMapper(类名随意,我们也可以起名WordCountMap但是必须继承Mapper接口)继承了Mapper接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整。
熟悉java的同学会看到出现了一些新的数据类型:比如Text,IntWritable,Context。
LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,它们 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
Context则是负责收集键值对的中间结果或者最终结果,有些版本可以用OutputCollector<Text, IntWritable> output,但是用法都一样,都是用来收集结果。
参数中context负责收集键值对的中间结果传递给reduce。
我们的file1.txt和file2.txt在hadoop中会经过TextInputFormat,每个文件(或其一部分)都会单独地作为map的输入,而这是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示为<key,value>形式:key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable;value值是每行的内容,数据类型是Text。
也就是说 我们写在文本中的内容 就存在 Text value这个参数中。
然后通过while去遍历, 把单词放入word变量中。
然后把word变量和计数器1作为结果 存起来。
那么经过了map之后的context中的结果就是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
这个结果会自动传给reduce。
这段代码实现了Reduce的功能,我们声明了一个类IntSumReducer(类名随意,我们也可以起名WordCountReduce但是必须继承Reducer接口)继承了Reducer接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整参数类型。
定义一个变量,用来装每一组的计数结果。
参数中context负责收集键值对的最终结果。
key对应map传递过滤的key,values对应map传递过滤的value。
为什么这里是values呢。
因为进入reduce方法时会自动分组,只有key一样的数据才会同时进入一个reduce中。
map传递过来的结果中是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
也就是 这个例子中会进入三次reduce,
第一次 key 是 hello, values是[1,1]
第二次key 是 world,values是[1]
第三次key是 hadoop ,values是[1]
所以最终的结果是
<hello,2>
<world,1>
<hadoop,1>
mapreduce中需要一个main方法配置参数,向hadoop框架描述map-reduce执行的工作,并提交运行。
hadoop基础----hadoop实战(四)-----myeclipse开发MapReduce---myeclipse搭建hadoop开发环境并运行wordcount
也在很早的章节中了解了MapReduce的原理
hadoop基础----hadoop理论(四)-----hadoop分布式并行计算模型MapReduce详解
目标
MapReduce主要的流程是 map----》reduce。我们本章节来详细学习java代码中,是怎样配置实现MapReduce的。就以WordCount例子为例。
本章节的目的是 熟悉MapReduce的写法之后,我们能写出更多的业务处理,解决更多的其它问题。
MapReduce的结构
写一个MapReduce主要有三部分:Mapper接口的实现,Reducer接口的实现,Job的配置。
Mapper接口和Reducer接口的实现就是要分别编写两个类(例如分别叫做Map类和Reduce类)。
在Map类中规定如何将输入的<key, value>对转化为中间结果的<key, list of values>对。
在Reduce类中规定如何将Map输出的中间结果进一步处理,转化为最终的结果输出<key, value>对。
而对Job的配置是要在main函数中创建相关对象,调用其方法实现的。
完整代码
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { //编写完成Map任务的静态内部类,类的名字就叫TokenizerMapper,继承Mapper类 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } //编写完成Reduce任务的静态内部类,类的名字就叫IntSumReducer,继承Reducer类 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //main函数中所要做的就是Job的配置和提交 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
文件中的内容
我们用来作测试的文件有2个,分别是file1.txt和file2.txt。file1.txt中是
hello world
file2.txt中是
hello hadoop
Mapper接口的实现分析
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
这段代码实现了Map的功能,我们声明了一个类TokenizerMapper(类名随意,我们也可以起名WordCountMap但是必须继承Mapper接口)继承了Mapper接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整。
熟悉java的同学会看到出现了一些新的数据类型:比如Text,IntWritable,Context。
LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,它们 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
Context则是负责收集键值对的中间结果或者最终结果,有些版本可以用OutputCollector<Text, IntWritable> output,但是用法都一样,都是用来收集结果。
private final static IntWritable one = new IntWritable(1);定义了一个int赋值1,作为计数器。
private Text word = new Text();定义一个变量,用来保存key。这个key会用来作为map区分数据。
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {Mapper接口中的必须有map方法实现功能,传入参数一般也是固定的。
参数中context负责收集键值对的中间结果传递给reduce。
我们的file1.txt和file2.txt在hadoop中会经过TextInputFormat,每个文件(或其一部分)都会单独地作为map的输入,而这是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示为<key,value>形式:key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable;value值是每行的内容,数据类型是Text。
也就是说 我们写在文本中的内容 就存在 Text value这个参数中。
StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); }StringTokenizer 是一个分词器。用来把句子拆分成一个个单词。 value是我们文本中的内容,这里是把内容分成一个个单词。
然后通过while去遍历, 把单词放入word变量中。
然后把word变量和计数器1作为结果 存起来。
那么经过了map之后的context中的结果就是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
这个结果会自动传给reduce。
Reducer接口的实现分析
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
这段代码实现了Reduce的功能,我们声明了一个类IntSumReducer(类名随意,我们也可以起名WordCountReduce但是必须继承Reducer接口)继承了Reducer接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整参数类型。
private IntWritable result = new IntWritable();
定义一个变量,用来装每一组的计数结果。
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {Reducer接口中的必须有reduce方法实现功能,传入参数一般也是固定的。
参数中context负责收集键值对的最终结果。
key对应map传递过滤的key,values对应map传递过滤的value。
为什么这里是values呢。
因为进入reduce方法时会自动分组,只有key一样的数据才会同时进入一个reduce中。
map传递过来的结果中是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
也就是 这个例子中会进入三次reduce,
第一次 key 是 hello, values是[1,1]
第二次key 是 world,values是[1]
第三次key是 hadoop ,values是[1]
int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result);循环values列表,相加计数后放入最终结果容器context中。
所以最终的结果是
<hello,2>
<world,1>
<hadoop,1>
Job的配置--main方法
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
mapreduce中需要一个main方法配置参数,向hadoop框架描述map-reduce执行的工作,并提交运行。
Configuration conf = new Configuration();创建一个配置实例。
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); }获取参数并判断是否合法。
Job job = new Job(conf, "word count");新建一个job任务。
job.setJarByClass(WordCount.class);设置运行的jar类,也就是mapreduce的主类名。
job.setMapperClass(TokenizerMapper.class);设置map类,也就是继承map接口的类名。
job.setCombinerClass(IntSumReducer.class);设置Combiner类,其实map到reduce还有一道工序是Combiner,如果有特殊需求可以新建一个类,没有的话直接使用继承reduce接口的类即可。
job.setReducerClass(IntSumReducer.class);设置reduce类,也就是继承reduce接口的类名。
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);设置输出结果的 key 和value的数据类型。
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));设置输入输入结果的路径,我们可以把路径写死,也可以通过参数传进来,我们这里就是用的接受的参数的值。
System.exit(job.waitForCompletion(true) ? 0 : 1);提交运行,完成后退出程序。
相关文章推荐
- Mybatis3+Spring4+SpringMVC4 整合【转】
- Spring 注释 @Autowired 和@Resource 的区别
- XML和对象相互转换的方法
- JDK安装与环境变量配置
- Java对象和类
- java中Map按值排序
- java的继承
- java源码分析之集合框架SortedMap 、 NavigableMap 、Dictionary 09
- 初学Spring-XML文件配置Bean的一些知识点
- Java利用POI生成Excel强制换行
- spring加载bean实例化顺序
- Java进制相关的算法
- 关于Eclipse和STS里SVN报Subclipse talks to Subversion via a Java API that requires access to native错
- eclipse快捷键表
- Java 模拟并发操作压力测试代码
- Java集合简介
- 79. Word Search
- Google Java编程风格指南中文版
- 怎么学好java,给大家一些经验!
- Struts2动态结果集