mapreduce之分区,分组,排序,二次排序的综合应用
2016-11-25 23:24
357 查看
我们还是看下这个图,mapper处理后的中间数据经过shuffle阶段再由reducer处理。在shuffle阶段会进行分区,分组,排序,二次排序。这是个比较复杂的过程,但是我们理解以下这些东西对于工作中常见业务的开发就够用了:
分区 就是mapper数据处理完分成若干个partition交给reducer处理,也是利用多个reducer task并发处理来提高效率,但有些业务比如要求得出数据的全局排序结果,那没办法只能用由mapper产生1个partition交由一个reducer来处理。在代码中分区是怎么控制的呢?就像上文所说重写Partitioner类中的getPartition方法。
分组 我们知道reducer的reduce方法处理的数据结构式(key,Iterable)这个key-value值得value是个列表,就是map中key经过分组(group)后相同的key对应的值形成的列表,默认分组就是根据key的值来分组,hive中的分组操作:
select class,count(student) from tablename group by class最终就是翻译成了这里要说的分组,而tablename表中总的数据就可以理解成这个partition的数据。可以说分区是为了并发处理提高效率,分组则是为了服务业务的开发。
排序 mapreduce排序是对key值进行排序,默认按字典顺序排序,当然可以自定义排序。但有时候需要数据在reducer的reduce的要处理的数据(key,Iterable)这个value的列表Iterable进行排序,这就是二次排序。因为mapreduce只能对key进行排序,我们可以把value的值放到key里面来影响排序,具体下面的到例子来看。
具体应用
我在新浪处理微博数据的时候有个需求很简单:两个数据来源:
全量的imei值,路径:/sinadata/all_imei/20161126
当天活跃的imei值,路径:/sinadata/active_imei/20161126
需要得到在全量imei中存在的当天活跃imei
如果用hive很简单:
select a.imei from all_imei a join active_imei b where a.imei=b,imei就是一个join操作,下面说下用mapreduce(reducer端join)来实现:
两个数据来源需要对每个来源做个标记,所以key值不能单单是imei,需要是imei+标记,定义他们的封装类:
public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { this.set(new Text(first), new Text(second)); } public void set(Text first, Text second) { this.first = first; this.second = second; } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public String toString() { return first + "\t" + second; } public Text getFirst() { return first; } public Text getSecond() { return second; } }
first是imei,second是标记
处理全量imei的map方法:
map(key,value,context){ context.write(new TextPair(imei,"0"),new Text("WEIBO")) }
标记是0
处理当天活跃imei的map方法:
map(key,value,context){ context.write(new TextPair(imei,"1"),new Text("")) }
标记是1
分区:就根据imei进行分区,也就是TextPair中的first
public class TextPairKeyPartitioner extends Partitioner<TextPair, Text>{ public int getPartition(TextPair key, Text value, int numPartitions){ return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
分组:注意分组也要根据imei,而不是根据整个TextPair分组,这样两个数据来源也就是两个map来的两部分数据相同的imei分到一个组
` public static class FirstComparator extends WritableComparator
{
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
protected FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compareBytes(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { e.printStackTrace(); } return 0; } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst()); } return super.compare(a, b); } }`
重点是两个map的数据到reducer的处理:
public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String imeiKey = key.getFirst().toString(); String second = key.getSecond().toString(); if(second.equals("1")){ return; } String lines = ""; String weekInfo = ""; java.util.Iterator it = values 4000 .iterator(); while(it.hasNext()){ lines = it.next().toString(); second = key.getSecond().toString(); if(second.equals("0")){ weekInfo = lines; }else if(second.equals("1")){ //context.write(new Text(imeiKey+"\t"+weekInfo), NullWritable.get()); String [] line = new String[1]; line[0]=imeiKey+"\t"+weekInfo; context.write(NullWritable.get(),Tools.rcfileDeal(line)); return; } } }
驱动函数中的方法:
Job job = new Job(conf);
job.setJarByClass(CalcImeiWeekKeepMainV2.class);
//设置分组
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
//设置分区
job.setPartitionerClass(TextPairKeyPartitioner.class);
//全量imei处理mapper
MultipleInputs.addInputPath(job, new Path(input_imei_info), RCFileInputFormat.class, ImeiWeekKeepMapperV2.class);
//当天活跃imei处理mapper
MultipleInputs.addInputPath(job,new Path(input_bhv),RCFileInputFormat.class,ImeiBhvWeekKeepMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
**job.setReducerClass(CalcImeiWeekKeepDetailReducer.class);
RCFileOutputFormat.setOutputPath(job, new** Path(output_mid_path_day));
job.setOutputFormatClass(RCFileOutputFormat.class);
job.setNumReduceTasks(reduceNum[0]);
code = job.waitForCompletion(true) ? 0 : 1;
比如:
全量的数据有两个字段:
imei wm a weino b weibo c weibo
mapper输出:
(<”a”,”0”>,”weibo”)
(<”b”,”0”>,”weibo”)
(<”c”,”0”>,”weibo”)
当天活跃imei两个个字段:
imei a b
mapper输出:
(<”a”,”1”>,”“)
(<”b”,”1”>,”“)
(<”d”,”1”>,”“)
分区根据imei来分区,加入上面mapper输出的5条数据全部到了一个分区:
(<”a”,”0”>,”weibo”)
(<”b”,”0”>,”weibo”)
(<”c”,”0”>,”weibo”)
(<”a”,”1”>,”“)
(<”b”,”1”>,”“)
因为也是根据imei分组,这5条数据到reduce方法时候分为了4组:
(<”a”,”0”>,”weibo”)
(<”a”,”1”>,”“)
(<”b”,”0”>,”weibo”)
(<”b”,”1”>,”“)
(<”c”,”0”>,”weibo”)
(<”d”,”1”>,”“)
第一组在reduce方法中:key是(<”a”,”0”>,<”a”,”1”>) 值value是{“weibo” ,”“},值是个列表很好理解,其实key也可以理解为一个列表
当value第一次执行value.next()方法获取列表中第一个值时,key的getSecond方法返回的是0,第二次执行value.next()方法时,key的getSecond方法返回的是1。当然我们也可以重写排序方法,1在前面0在后面。
最终利用分组和二次排序实现reducer端的join操作
相关文章推荐
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- MapReduce二次排序分区,分组优化
- MapReduce处理二次排序(分区-排序-分组)
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- MapReduce的分区与 分组二次排序
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- mapreduce,自定义分区,分组,排序实现join
- Hadoop MapReduce 二次排序原理及其应用
- mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
- Mapreduce中的 自定义类型、分组与二次排序
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- MapReduce的自定义排序、分区和分组
- HADOOP(2)__Mapreduce分区、排序、分组
- MapReduce 高级应用练习:二次排序及Join
- MapReduce的自制Writable分组输出及组内排序
- MapReduce中的二次排序