您的位置:首页 > 其它

map-combine过程解

2015-09-09 14:42 549 查看
●read阶段:通过RecordReader从InputSplit分片中将数据解析成一个个key/value。

●map阶段:将由RecordReader解析出的key/value交给map()方法处理,并生成一个个新的key/value。

●collect阶段:将map()中新生成key/value由OutpCollector.collect()写入内存中的环形数据缓冲区。

●spill阶段:当环形缓冲区达到一定阀值后,会将数据写到本地磁盘上,生成一个spill文件。在写文件之前,会先将数据进行一次本地排序,必要的时候(按配置要求)还会对数据进行压缩。

●combine阶段:当所有数据处理完后,将所有的临时的spill文件进行一次合并,最终之生成一个数据文件。
-------任务少、数据量小时如此,其它情况不是;

最后,我们再来看一下Combiner的执行时机。我们之前已对map端的shuffle做过比较升入的了解,详情请看MapTask详解。那么,Combiner会在map端的那个时期执行呢?实际上,Conbiner函数的执行时机可能会在map的merge操作完成之前,也可能在merge之后执行,这个时机由配置参数min.num.spill.for.combine(该值默认为3),也就是说在map端产生的spill文件最少有min.num.spill.for.combine的时候,Conbiner函数会在merge操作合并最终的本机结果文件之前执行,否则在merge之后执行。通过这种方式,就可以在spill文件很多并且需要做conbine的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。

eg:

MAP:

public void map(LongWritable key,Text
value,Context context) throws IOException, InterruptedException{

String valueStr = value.toString();

String[] fields = valueStr.trim().split(",");

context.write(new Text(fields[4].substring(4)),new Text(fields[37]));

}

COMBINE:

//
public static class combine extends Reducer<Text,Text,Text,Text>{

// public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{

// int num = 0;

// float moneyCount = 0;

// for(Text value : values){

// num=num+1;

// moneyCount=Float.parseFloat(value.toString())+moneyCount;

// }

// context.write(key, new Text(String.valueOf(num)+","+String.valueOf(moneyCount)));

// }

// }

因为不是所有的map结束才执行combine,而是递归的去执行(有可能某个combine的结果和map在一起执行combine),所以map的结果必须和combine的结果完全一致,所以上述代码有误。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: