您的位置:首页 > 大数据 > 云计算

云计算hadoop系列之二:Map/Reduce入门

2009-11-11 04:37 483 查看
 Map/Reduce是分布式系统的核心概念,分布式的任务都要在Map/Reduce框架下完成。正如前一篇所说,Map/Reduce是一个任务分发和回收的机制。

一个分布式任务的执行,分为以下几步:

1. 头结点接到任务请求,把它分解为多个子任务(Map)

2. 头结点将子任务(Map)分发到各个结点,并监控子结点的运行状态

3. 子结点接到Map子任务后,执行任务,产生子任务的中间结果,将结果提交头结点

4. 头结点对各子结点中间结果进行汇总(Reduce),生成最终结果。

 

从以上步骤中我们可以看出:

一个任务必须分解为子任务(Map),才有可能在分布式系统中运行

子任务(Map)应该是相互独立的,不能存在先后顺序的依赖

一个大型的任务,只有在Map阶段完成了多数工作才能利用分布式系统改善性能。

 

那么,什么样的任务可以分解为相互独立的子任务,又怎样分解呢?让我们从最简单的例子说起。

 

例2.1 一个文件夹里,有20个文本文件,求这20个文件中单词"the"的出现总次数。

呵呵,这个例子太简单了吧,每个Map算一个文件,然后Reduce中求和。

 

好,复杂一点的:

例2.2 还是这20个文件,求每一个单词的总出现次数,并求出出现频率最高的单词

这也不难嘛。还是每个Map算一个文件,只不过算出的结果不是一个数值,而是一组<键,值>对。键就是单词,值就是它的出现次数:

<hello, 10>

<how,  3>

<are,    25>

<you,   12>

........

在Reduce中,把相同键的值求和,然后找出最大值,就得出了最终结果。

 

到这里,我们做一个小小的总结。我们关注Map和Reduce的数据交换,是以<键,值>对为基础的。事实上,在hadoop中,Map部分和Reduce部分的输入和输出,都必须是键值对的形式。也就是说,一项普通的任务,想把它转化为Map/Reduce任务,需要满足两个条件:这个任务可以用键值对来描述,并且Map中的键值对互相独立。如果一个任务的确有先后步骤,则只能尝试对每步进行上述分解。

 

在hadoop的官方文档中,WordCount也是入门第一例,以下是Java代码及注释:

 

WordCount.java
1.package org.myorg;
2. 
3.import java.io.IOException;
4.import java.util.*;
5. 
6.import org.apache.hadoop.fs.Path;
7.import org.apache.hadoop.conf.*;
8.import org.apache.hadoop.io.*;
9.import org.apache.hadoop.mapred.*;
10.import org.apache.hadoop.util.*;
11. 
12.public class WordCount {
13. 
14.  

public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {   //这里LongWritable, Text是输入的Key和Value的类型,Text, IntWritable是输出的Key和Value类型
15.    

private final static IntWritable one = new IntWritable(1);

16.    
private Text word = new Text();
17. 
18.    

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {

19.      
String line = value.toString();
20.      
StringTokenizer tokenizer = new StringTokenizer(line);
21.      
while (tokenizer.hasMoreTokens()) {
22.        
word.set(tokenizer.nextToken());
23.        
output.collect(word, one);
24.      
}
25.    
}
26.  
}
27. 
28.  

public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {

29.    

public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {

30.      
int sum = 0;
31.      
while (values.hasNext()) {
32.        
sum += values.next().get();    //求和操作
33.      
}
34.      
output.collect(key, new IntWritable(sum));
35.    
}
36.  
}

4000
37. 
38.  

public static void main(String[] args) throws Exception {

39.    

JobConf conf = new JobConf(WordCount.class);

40.    
conf.setJobName("wordcount");
41. 
42.    
conf.setOutputKeyClass(Text.class);    //设置输出Key类型
43.    
conf.setOutputValueClass(IntWritable.class);  //设置输出Value类型
44. 
45.    
conf.setMapperClass(Map.class);  //设置Mapper类
46.    
conf.setCombinerClass(Reduce.class);  //设置Combiner类 Combiner是一个小Reduce,它只Reduce一部分的Mapper,减少Reduce的处理数据量,提高效率
47.    
conf.setReducerClass(Reduce.class);  //设置Reduce类
48.//设置输入输出格式和文件路径
49.    
conf.setInputFormat(TextInputFormat.class);
50.    
conf.setOutputFormat(TextOutputFormat.class);
51. 
52.    
FileInputFormat.setInputPaths(conf, new Path(args[0]));
53.    
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54. 
55.    
JobClient.runJob(conf);
57.  
}
58.}
59. 
 

在上面的例子中,加入了一个Combiner。Combiner是一个临时的Reduce,把一部分Mapper的结果临时合并,以减少Reduce的压力,提高系统的效率。要知道,hadoop公司人多了,就得设几个部门经理,要是所有几千人都归总经理一个人管,不疯才怪。

 

再出道题考考你:

例2.3 用hadoop分布式计算圆周率π的值。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

这里给出答案,自己研究吧。这也是hadoop官方的一个例子,源代码在package org.apache.hadoop.examples包中

 

package org.apache.hadoop.examples;

/**

 * 用蒙特卡洛方法计算。就是在一个1x1的方格内随机撒点,然后计算落在圆内和圆外的点数比。

 */

public class PiEstimator extends Configured implements Tool {

 

  public static class PiMapper extends MapReduceBase

    implements Mapper<LongWritable, Writable, LongWritable, LongWritable> {

 

    static Random r = new Random();

   

    long numInside = 0L;

    long numOutside = 0L;

  

    public void map(LongWritable key,

                    Writable val, OutputCollector<LongWritable, LongWritable> out,

                    Reporter reporter) throws IOException {

      long nSamples = key.get();

      for(long idx = 0; idx < nSamples; idx++) {

        double x = r.nextDouble();

        double y = r.nextDouble();

        double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);

        if (d > 0.25) {

          numOutside++;

        } else {

          numInside++;

        }

        if (idx%1000 == 1) {

          reporter.setStatus("Generated "+idx+" samples.");

        }

      }

      out.collect(new LongWritable(0), new LongWritable(numOutside));

      out.collect(new LongWritable(1), new LongWritable(numInside));

    }

  

  }

 

  public static class PiReducer extends MapReduceBase

    implements Reducer<LongWritable, LongWritable, WritableComparable, Writable> {

   

    long numInside = 0;

    long numOutside = 0;

    JobConf conf;

    

    public void reduce(LongWritable key,

                       Iterator<LongWritable> values,

                       OutputCollector<WritableComparable, Writable> output,

                       Reporter reporter) throws IOException {

      if (key.get() == 1) {

        while (values.hasNext()) {

          long num = values.next().get();

          numInside += num;

        }

      } else {

        while (values.hasNext()) {

          long num = values.next().get();

          numOutside += num;

        }

      }

    }

     

  /**

   * This is the main driver for computing the value of Pi using

   * monte-carlo method.

   */

  double launch(int numMaps, long numPoints, String jt, String dfs)

    throws IOException {

    JobConf jobConf = new JobConf(getConf(), PiEstimator.class);

    if (jt != null) { jobConf.set("mapred.job.tracker", jt); }

  以下任务配置部分省略。

 

 

有一基本的概念,我们现在已经知道hadoop是怎样调动小机群们协同工作的,下一讲我们要进入实战,讲解hadoop的安装和配置。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息