对Hadoop自带程序WordCount的解读(转载,自用)
2016-07-22 16:49
405 查看
注:学习备忘自用。
原文链接:http://www.2cto.com/database/201403/287775.html
刚开始学习hadoop,对于Hadoop运行原理还不是特别熟悉,通过此例子可以对hadoop运行的原理有个初步的认知。
下面是把源代码拷到eclipse程序中,利用此代码(并未修改)测试一下实际的数据并得到结果。(注释是对上以一行的解释)
WourdCount程序中隐藏的秘密
1、具体流程:
1)文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value> 对,如下图。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数和Linux环境有关。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对。
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。
可以概括为5个步骤:
1)Read:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
2)Map:该阶段主要将解析出的key/value交给用户编写的map()函数处理,并产生一系列的key/value。
3)Collect:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输入结果。在该函数内部,它会将生成的key/value分片(通过Partitioner),并写入一个环形内存缓冲区中。
4)Spill:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作。
5)Combine:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
可以概括为5个步骤:
1)Shuffle:也称Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阀值,则写到磁盘上,否则直接放到内存中。
2)Merge:在远程拷贝的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘上文件过多。
3)Sort:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一 起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现了对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。
4)Reduce:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
5)Write:reduce()函数将计算结果写到HDFS。
原文链接:http://www.2cto.com/database/201403/287775.html
原文链接:http://www.2cto.com/database/201403/287775.html
刚开始学习hadoop,对于Hadoop运行原理还不是特别熟悉,通过此例子可以对hadoop运行的原理有个初步的认知。
下面是把源代码拷到eclipse程序中,利用此代码(并未修改)测试一下实际的数据并得到结果。(注释是对上以一行的解释)
package com.wimang.test; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { // 规定map中用到的数据类型,这里的Text相当于jdk中的String IntWritable相当于jdk的int类型, // 这样做的原因主要是为了hadoop的数据序化而做的。 private final static IntWritable one = new IntWritable(1); // 声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值 private Text word = new Text();// 用来暂存map输出中的key值,Text类型的 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 这就是map函数,它是和Mapper抽象类中的相对应的,此处的Object key,Text // value的类型和上边的Object, // Text是相对应的,而且最好一样,不然的话,多数情况运行时会报错。 StringTokenizer itr = new StringTokenizer(value.toString()); // Hadoop读入的value是以行为单位的,其key为该行所对应的行号,因为我们要计算每个单词的数目, // 默认以空格作为间隔,故用StringTokenizer辅助做字符串的拆分,也可以用string.split("")来作。 while (itr.hasMoreTokens()) { // 遍历一下每行字符串中的单词 word.set(itr.nextToken()); // 出现一个单词就给它设成一个key并将其值设为1 context.write(word, one); // 输出设成的key/value值 // 上面就是map打散的过程 } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // reduce的静态类,这里和Map中的作用是一样的,设定输入/输出的值的类型 private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { // 由于map的打散,这里会得到如,{key,values}={"hello",{1,1,....}},这样的集合 sum += val.get(); // 这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和 } result.set(sum); // 将values的和取得,并设成result对应的值 context.write(key, result); // 此时的key即为map打散之后输出的key,没有变化,变化的时result,以前得到的是一个数字的集合, // 已经给算出和了,并做为key/value输出。 } } @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 取得系统的参数 String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { // 判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数 System.err.println("Usage: wordcount <in> <out>"); System.exit(2); // 若非两个参数,即退出 } Job job = new Job(conf, "word count"); // 此程序的执行,在hadoop看来是一个Job,故进行初始化job操作 job.setJarByClass(WordCount.class); // 可以认为成,此程序要执行MyWordCount.class这个字节码文件 job.setMapperClass(TokenizerMapper.class); // 在这个job中,我用TokenizerMapper这个类的map函数 job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); // 在这个job中,我用IntSumReducer这个类的reduce函数 job.setOutputKeyClass(Text.class); // 在reduce的输出时,key的输出类型为Text job.setOutputValueClass(IntWritable.class); // 在reduce的输出时,value的输出类型为IntWritable FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 初始化要计算word的文件的路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 初始化要计算word的文件的之后的结果的输出路径 System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交job到hadoop上去执行了,意思是指如果这个job真正的执行完了则主函数退出了,若没有真正的执行完就退出了。 } }
1)文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value> 对,如下图。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数和Linux环境有关。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对。
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。
2、Map Task的整体流程:
可以概括为5个步骤:1)Read:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
2)Map:该阶段主要将解析出的key/value交给用户编写的map()函数处理,并产生一系列的key/value。
3)Collect:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输入结果。在该函数内部,它会将生成的key/value分片(通过Partitioner),并写入一个环形内存缓冲区中。
4)Spill:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作。
5)Combine:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
3、Reduce的整体流程:
可以概括为5个步骤:1)Shuffle:也称Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阀值,则写到磁盘上,否则直接放到内存中。
2)Merge:在远程拷贝的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘上文件过多。
3)Sort:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一 起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现了对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。
4)Reduce:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
5)Write:reduce()函数将计算结果写到HDFS。
原文链接:http://www.2cto.com/database/201403/287775.html
相关文章推荐
- ART世界探险(3) - ARM 64位CPU的架构快餐教程
- Linux消息队列
- 调用百度报Cannot read property 'lng' of null错误
- Linux上的SSH无法启动,报告/var/empty/sshd must be owned by root and not group or world-writable
- docker默认设置下访问私有docker hub遇到的https问题
- Hadoop2.0的HA介绍
- Linux初学者必备命令
- Linux命令--grep
- Nginx配置
- iOS常用功能的网站地址
- linux 操作系统中find文件搜索命令的使用
- CentOs7&zookeeper
- Best practice for Invoke other scripts or exe in PowerShell
- popuwindow
- Ubuntu下Apache+SVN+submin实现WEB管理SVN
- linux(CentOS7)中安装JDK7,部署jboss7
- zabbix监控nginx状态
- POJ3641-Pseudoprime numbers
- nginx 引入外部文件
- nginx 引入外部文件