Hadoop(十六)之使用Combiner优化MapReduce【转载】
2017-11-24 15:50
337 查看
阅读目录(Content)
一、Combiner概述
1.1、为什么需要Combiner
1.2、Combiner介绍
二、使用Combiner优化Mapduce执行
2.1、使用前提
2.2、怎么使用
2.3、利用Combiner计算每一年的平均气温
2.4、计算每一年每个气象站的平均温度
前言
前面的一篇给大家写了一些MapReduce的一些程序,像去重、词频统计、统计分数、共现次数等。这一篇给大家介绍的是关于Combiner优化操作。
map处理的数据的结果在进入reduce的时候,reduce会通过远程的方式去获取数据。
在map处理完数据之后,数据量特别大的话。reduce再去处理数据它就要通过网络去获取很多的数据。
这样会导致一个问题是:大量的数据会对网络带宽造成一定的影响。
有没有一种方式能够类似reduce一样,在map端处理完数据之后,然后在reduce端进行一次简单的数据处理?
MapReudce正常处理是:
map处理完,中间结果存放在map节点上。reduce处理的数据通过网络形式拿到reduce所在的节点上。
如果我们能够在map端进行一次类似于reduce的操作,这样会使进入reduce的数据就会少很多。
我们把在map端所执行的类似于reduce的操作成为Combiner。
每一个map都可能会产生大量的本地输出
2)Combiner功能
对map端的输出先做一次合并
3)目的
减少在map和reduce节点之间的数据传输量, 以提高网络IO性能。
数据输入的键值类型和数据输出的键值类型一样的reduce我们可以把它当做Combiner来使用
原文地址:http://www.cnblogs.com/zhangyinhua/p/7739525.html
一、Combiner概述
1.1、为什么需要Combiner
1.2、Combiner介绍
二、使用Combiner优化Mapduce执行
2.1、使用前提
2.2、怎么使用
2.3、利用Combiner计算每一年的平均气温
2.4、计算每一年每个气象站的平均温度
前言
前面的一篇给大家写了一些MapReduce的一些程序,像去重、词频统计、统计分数、共现次数等。这一篇给大家介绍的是关于Combiner优化操作。
一、Combiner概述
1.1、为什么需要Combiner
我们map任务处理的结果是存放在运行map任务的节点上。map处理的数据的结果在进入reduce的时候,reduce会通过远程的方式去获取数据。
在map处理完数据之后,数据量特别大的话。reduce再去处理数据它就要通过网络去获取很多的数据。
这样会导致一个问题是:大量的数据会对网络带宽造成一定的影响。
有没有一种方式能够类似reduce一样,在map端处理完数据之后,然后在reduce端进行一次简单的数据处理?
MapReudce正常处理是:
map处理完,中间结果存放在map节点上。reduce处理的数据通过网络形式拿到reduce所在的节点上。
如果我们能够在map端进行一次类似于reduce的操作,这样会使进入reduce的数据就会少很多。
我们把在map端所执行的类似于reduce的操作成为Combiner。
1.2、Combiner介绍
1) 前提每一个map都可能会产生大量的本地输出
2)Combiner功能
对map端的输出先做一次合并
3)目的
减少在map和reduce节点之间的数据传输量, 以提高网络IO性能。
二、使用Combiner优化Mapduce执行
2.1、使用前提
不能对最原始的map的数据流向reduce造成影响。也就是说map端进入reduce的数据不收Combiner的影响。数据输入的键值类型和数据输出的键值类型一样的reduce我们可以把它当做Combiner来使用
import com.briup.bd1702.hadoop.mapred.utils.WeatherRecordParser; import com.briup.bd1702.hadoop.mapred.utils.YearStation; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AverageTemperatureByYearStationWithCombiner_0010 extends Configured implements Tool{ static class AvgTempByYSWithCombMapper extends Mapper<LongWritable,Text,YearStation,AverageValue>{ private YearStation ys=new YearStation(); private AverageValue av=new AverageValue(); private WeatherRecordParser parser=new WeatherRecordParser(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ ys.set(parser.getYear(),parser.getStationId()); av.set(1,parser.getTemperature()); context.write(ys,av); } } } static class AvgTempByYSWithCombCombiner extends Reducer<YearStation,AverageValue,YearStation,AverageValue>{ private AverageValue av=new AverageValue(); @Override protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double count=0.0; for(AverageValue av:values){ sum+=av.getNum().get(); count+=av.getAvgValue().get()*av.getNum().get(); } av.set(sum,count/sum); context.write(key,av); } } static class AvgTempByYSWithCombReducer extends Reducer<YearStation,AverageValue,YearStation,DoubleWritable>{ private DoubleWritable result=new DoubleWritable(); @Override protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double count=0; for(AverageValue av:values){ sum+=av.getNum().get(); count+=av.getAvgValue().get()*av.getNum().get(); } result.set(count/sum); context.write(key,result); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AvgTempByYSWithCombMapper.class); job.setMapOutputKeyClass(YearStation.class); job.setMapOutputValueClass(AverageValue.class); job.setCombinerClass(AvgTempByYSWithCombCombiner.class); job.setReducerClass(AvgTempByYSWithCombReducer.class); job.setOutputKeyClass(YearStation.class); job.setOutputValueClass(DoubleWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00050_AverageTemperatureByYearStationWithCombiner_0010(),args)); } }
原文地址:http://www.cnblogs.com/zhangyinhua/p/7739525.html
相关文章推荐
- Hadoop(十六)之使用Combiner优化MapReduce
- 针对ASP.NET页面实时进行GZIP压缩优化的几款压缩模块的使用简介及应用测试(转载)
- 如何在Hadoop中使用Streaming编写MapReduce(转帖)
- 在云中使用 MapReduce 和负载平衡 (转载)
- Hadoop使用常见问题以及解决方法(转载)
- 使用HADOOP演示MAPREDUCE配置总结
- 针对ASP.NET页面实时进行GZIP压缩优化的几款压缩模块的使用简介及应用测试!(转载)
- Hadoop MapReduce进阶 使用Chain
- 使用Hadoop MapReduce 排序
- Hadoop MapReduce进阶 使用分布式缓存进行replicated join
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- [转载]使用Vitamio打造自己的Android万能播放器(8)——细节优化
- 设计模式(1)-使用简单工厂优化代码(转载)
- 使用 Linux 和 Hadoop 进行分布式计算(转载)
- lucene、lucene.NET详细使用与优化(转载)
- Hadoop之MapReduce任务的优化
- 如何在Hadoop中使用Streaming编写MapReduce(转帖)
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 采用HADOOP的MAPREDUCE使用其他文件系统或数据库系统的方式