您的位置:首页 > 运维架构

Hadoop(十六)之使用Combiner优化MapReduce【转载】

2017-11-24 15:50 337 查看
阅读目录(Content)

一、Combiner概述

1.1、为什么需要Combiner

1.2、Combiner介绍

二、使用Combiner优化Mapduce执行

2.1、使用前提

2.2、怎么使用

2.3、利用Combiner计算每一年的平均气温

2.4、计算每一年每个气象站的平均温度

前言

  前面的一篇给大家写了一些MapReduce的一些程序,像去重、词频统计、统计分数、共现次数等。这一篇给大家介绍的是关于Combiner优化操作。

一、Combiner概述

1.1、为什么需要Combiner

  我们map任务处理的结果是存放在运行map任务的节点上。
  map处理的数据的结果在进入reduce的时候,reduce会通过远程的方式去获取数据。
  在map处理完数据之后,数据量特别大的话。reduce再去处理数据它就要通过网络去获取很多的数据。
  这样会导致一个问题是:大量的数据会对网络带宽造成一定的影响。

  有没有一种方式能够类似reduce一样,在map端处理完数据之后,然后在reduce端进行一次简单的数据处理?
    MapReudce正常处理是:

      map处理完,中间结果存放在map节点上。reduce处理的数据通过网络形式拿到reduce所在的节点上。
      如果我们能够在map端进行一次类似于reduce的操作,这样会使进入reduce的数据就会少很多。

  我们把在map端所执行的类似于reduce的操作成为Combiner。

1.2、Combiner介绍

  1) 前提

    每一个map都可能会产生大量的本地输出
  2)Combiner功能

    对map端的输出先做一次合并
  3)目的

     减少在map和reduce节点之间的数据传输量, 以提高网络IO性能。

二、使用Combiner优化Mapduce执行

2.1、使用前提

  不能对最原始的map的数据流向reduce造成影响。也就是说map端进入reduce的数据不收Combiner的影响。

  数据输入的键值类型和数据输出的键值类型一样的reduce我们可以把它当做Combiner来使用

  

import com.briup.bd1702.hadoop.mapred.utils.WeatherRecordParser;
import com.briup.bd1702.hadoop.mapred.utils.YearStation;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AverageTemperatureByYearStationWithCombiner_0010 extends Configured implements Tool{

static class AvgTempByYSWithCombMapper extends Mapper<LongWritable,Text,YearStation,AverageValue>{
private YearStation ys=new YearStation();
private AverageValue av=new AverageValue();
private WeatherRecordParser parser=new WeatherRecordParser();
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
parser.parse(value);
if(parser.isValid()){
ys.set(parser.getYear(),parser.getStationId());
av.set(1,parser.getTemperature());
context.write(ys,av);
}
}
}

static class AvgTempByYSWithCombCombiner extends Reducer<YearStation,AverageValue,YearStation,AverageValue>{
private AverageValue av=new AverageValue();
@Override
protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{
int sum=0;
double count=0.0;
for(AverageValue av:values){
sum+=av.getNum().get();
count+=av.getAvgValue().get()*av.getNum().get();
}
av.set(sum,count/sum);
context.write(key,av);
}
}

static class AvgTempByYSWithCombReducer extends Reducer<YearStation,AverageValue,YearStation,DoubleWritable>{
private DoubleWritable result=new DoubleWritable();
@Override
protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{
int sum=0;
double count=0;
for(AverageValue av:values){
sum+=av.getNum().get();
count+=av.getAvgValue().get()*av.getNum().get();
}
result.set(count/sum);
context.write(key,result);
}
}

@Override
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Path input=new Path(conf.get("input"));
Path output=new Path(conf.get("output"));

Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

job.setMapperClass(AvgTempByYSWithCombMapper.class);
job.setMapOutputKeyClass(YearStation.class);
job.setMapOutputValueClass(AverageValue.class);

job.setCombinerClass(AvgTempByYSWithCombCombiner.class);

job.setReducerClass(AvgTempByYSWithCombReducer.class);
job.setOutputKeyClass(YearStation.class);
job.setOutputValueClass(DoubleWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,input);
TextOutputFormat.setOutputPath(job,output);

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception{
System.exit(ToolRunner.run(new P00050_AverageTemperatureByYearStationWithCombiner_0010(),args));
}
}


   

原文地址:http://www.cnblogs.com/zhangyinhua/p/7739525.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: