map-combine过程解
2015-09-09 14:42
549 查看
●read阶段:通过RecordReader从InputSplit分片中将数据解析成一个个key/value。
●map阶段:将由RecordReader解析出的key/value交给map()方法处理,并生成一个个新的key/value。
●collect阶段:将map()中新生成key/value由OutpCollector.collect()写入内存中的环形数据缓冲区。
●spill阶段:当环形缓冲区达到一定阀值后,会将数据写到本地磁盘上,生成一个spill文件。在写文件之前,会先将数据进行一次本地排序,必要的时候(按配置要求)还会对数据进行压缩。
●combine阶段:当所有数据处理完后,将所有的临时的spill文件进行一次合并,最终之生成一个数据文件。
-------任务少、数据量小时如此,其它情况不是;
最后,我们再来看一下Combiner的执行时机。我们之前已对map端的shuffle做过比较升入的了解,详情请看MapTask详解。那么,Combiner会在map端的那个时期执行呢?实际上,Conbiner函数的执行时机可能会在map的merge操作完成之前,也可能在merge之后执行,这个时机由配置参数min.num.spill.for.combine(该值默认为3),也就是说在map端产生的spill文件最少有min.num.spill.for.combine的时候,Conbiner函数会在merge操作合并最终的本机结果文件之前执行,否则在merge之后执行。通过这种方式,就可以在spill文件很多并且需要做conbine的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。
eg:
MAP:
public void map(LongWritable key,Text
value,Context context) throws IOException, InterruptedException{
String valueStr = value.toString();
String[] fields = valueStr.trim().split(",");
context.write(new Text(fields[4].substring(4)),new Text(fields[37]));
}
COMBINE:
//
public static class combine extends Reducer<Text,Text,Text,Text>{
// public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
// int num = 0;
// float moneyCount = 0;
// for(Text value : values){
// num=num+1;
// moneyCount=Float.parseFloat(value.toString())+moneyCount;
// }
// context.write(key, new Text(String.valueOf(num)+","+String.valueOf(moneyCount)));
// }
// }
因为不是所有的map结束才执行combine,而是递归的去执行(有可能某个combine的结果和map在一起执行combine),所以map的结果必须和combine的结果完全一致,所以上述代码有误。
●map阶段:将由RecordReader解析出的key/value交给map()方法处理,并生成一个个新的key/value。
●collect阶段:将map()中新生成key/value由OutpCollector.collect()写入内存中的环形数据缓冲区。
●spill阶段:当环形缓冲区达到一定阀值后,会将数据写到本地磁盘上,生成一个spill文件。在写文件之前,会先将数据进行一次本地排序,必要的时候(按配置要求)还会对数据进行压缩。
●combine阶段:当所有数据处理完后,将所有的临时的spill文件进行一次合并,最终之生成一个数据文件。
-------任务少、数据量小时如此,其它情况不是;
最后,我们再来看一下Combiner的执行时机。我们之前已对map端的shuffle做过比较升入的了解,详情请看MapTask详解。那么,Combiner会在map端的那个时期执行呢?实际上,Conbiner函数的执行时机可能会在map的merge操作完成之前,也可能在merge之后执行,这个时机由配置参数min.num.spill.for.combine(该值默认为3),也就是说在map端产生的spill文件最少有min.num.spill.for.combine的时候,Conbiner函数会在merge操作合并最终的本机结果文件之前执行,否则在merge之后执行。通过这种方式,就可以在spill文件很多并且需要做conbine的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。
eg:
MAP:
public void map(LongWritable key,Text
value,Context context) throws IOException, InterruptedException{
String valueStr = value.toString();
String[] fields = valueStr.trim().split(",");
context.write(new Text(fields[4].substring(4)),new Text(fields[37]));
}
COMBINE:
//
public static class combine extends Reducer<Text,Text,Text,Text>{
// public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
// int num = 0;
// float moneyCount = 0;
// for(Text value : values){
// num=num+1;
// moneyCount=Float.parseFloat(value.toString())+moneyCount;
// }
// context.write(key, new Text(String.valueOf(num)+","+String.valueOf(moneyCount)));
// }
// }
因为不是所有的map结束才执行combine,而是递归的去执行(有可能某个combine的结果和map在一起执行combine),所以map的结果必须和combine的结果完全一致,所以上述代码有误。
相关文章推荐
- JavaBean转换为Map
- Spark入门实战系列--4.Spark运行架构
- MySQL 获得当前日期时间(以及时间的转换)
- Get open Popups
- ASP Request.ServerVariables 参数集
- 编程粗论
- ceph-dash安装部署
- html锚点定位的使用
- elasticsearch如何安全重启节点
- plSQL 32位oracle客户端配置
- ElasticSearch reindex by JAVA API
- Spark入门实战系列--3.Spark编程模型(下)--IDEA搭建及实战
- 计蒜客 第12题:最后一个单词的长度
- Mac虚拟机安装win7教程之Mac双系统怎么删除一个WINDOWS
- php阅读csv文件类
- 浅谈网页浏览器的发展
- iOS开发:创建真机调试证书
- wordpress学习五: 通过wordpress_xmlrpc的python包远程操作wordpress
- markdown test
- 国内OCR供应商及其演示链接