Hadoop 学习研究(六): hadoop中的排序操作(二次排序和全排序)
2017-06-03 11:05
435 查看
Hadoop中的排序操作:
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行各种类型的排序需求。
在Hadoop中常用的排序操作分为以下几类:
1. 自定义key值类型的键(实现WritableComparable接口)
2. 实现框架中的比较器(job.setSortComparatorClass(Class<? extends RawComparator))
3. 二次排序(实现自定义分区、自定义排序、自定义分组)
4. 全排序(TotalOrderPartitioner)
对于1、2的实现,实现WritableComparable接口,比较器需要继承RawComparator类或者WritableCompartor类。
主要讲解关于二次排序和全排序:
二次排序:
对于二次排序的实现,需要了解MapReduce中的数据结构和数据流,如下图:显示了影响数据结构和数据流的三个元素(分区、排序和分组)。
分区是在Map输出收集过程中被调用的,且用于确定哪些reduce端应该接收map输出。RawComparator用于对各自分区中的map输出进行排序,且都被map和reduce端使用。最后RawComparator负责通过被排序的记录确定分组边界。
具体实现例子:
1、输入数据:
sort1 1
sort2 3
sort2 77
sort2 54
sort1 2
sort6 22
sort6 221
sort6 20
2、目标输出(每个key值对应一个文件part-r-0000X)
sort1 1,2
sort2 3,54,77
sort6 20,22,221
(1)Map端处理:
根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录合并,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对Key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的Key值和其对应的数据组合成一个新的Key值,然后新的Key值对应的还是之前的数字。那么我们就可以将原始数据的map输出变成类似下面的数据结构:
{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那么我们只需要对[]里面的新key值进行排序就ok了。然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,所以我们需要根据新key值中的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}
分区操作完成之后,我调用自己的自定义排序器对新的Key值进行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}
(2)Reduce端处理:
经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,并且没处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成类似下面的数据结构:
{sort1,[1,2]}
{sort2,[3,54,77]}
{sort6,[20,22,221]}
代码示例:
1、自定义组合键
说明:在自定义组合键的时候,需要特别注意,一定要实现WritableComparable接口,并且实现compareTo方法的比较策略。这个用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整),但是其对我们最终的二次排序结果是没有影响的。我们二次排序的最终结果是由我们的自定义比较器决定的。
2、自定义分区器
说明:具体说明看代码注释。
3、自定义比较器
说明:自定义比较器决定了我们二次排序的结果。自定义比较器需要继承WritableComparator类或者RawComparator类,并且重写compare方法实现自己的比较策略。
全排序(TotalOrderPartitioner):
我们知道MapReduce计算框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,Hadoop 默认的partitioner是HashPartitioner,它依赖于output
key的hashcode,使得相同key会去相同reducer,但是不保证全局有序(不同partition之间无序),如果想要获得全局排序结果(比如获取全局统计量,日志重复、缺失、总数等),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。
在TotalOrderPartitioner中,我们可以看到 TotalOrderPartitioner依赖于一个partition
file来进行distribute keys, partition file是一个实现计算好的sequence file, 如果我们设置的reduce num=N, 那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
在实现全排序中需要用到采样器:InputSampler, InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:
1. RandomSampler 随机取样
2. IntervalSampler 从s个split里面按照一定间隔取样,通常适用于有序数据
3. SplitSampler 从s个split中选取前n条记录取样
在采样器 .writePartitionFile() 源码中我们可以看到
可以看到采样器在进行采样的时候,会调用输入InputFormat类,所以在进行全排序的时候需要自定义输入InputFormat,使得MapReduce计算框架中的Map输出的KV的逻辑写在自定义的RecordReader的nextkv函数中,以保证采样器用来采样并排序的数据是Map端的输出数据格式。
在Hadoop中利用InputSampler实现生成 全排序分区边界 文件的步骤:
1. 对待排序的数据进行抽样:(抽样时必须保证 抽样的输入==Map的输出)
2. 对抽样的数据进行排序,产生区间边界。(例如:pivot:3, 9 ,11)
3. Map端数据的输出,通过Partitioner来计算该数据KV处于哪个pivot之间,之后将数据相应的分成对应的分区。
(例如区间的划分: <3, [3,9) , >=9 ,分别对应Reduce0,Reduce1,Reduce2)
使用TotalOrderPartitioner的驱动配置如下:
使用TotalOrderPartitioner全排序的最大好处就是可以有效地避免出现数据倾斜的任务。
在工作中遇到需要对30G+的日志文件进行统计(日志总数、遗漏数、重复数、服务器重启次数)。在不使用TotalOrderPartitioner进行分区时,使用默认Hash分区,发现数据出现了倾斜,(即在某一个分区中存在大量的数据,使得这个分区的运行时间大大超过其他分区),使这个分区成为了拖延整个job任务的执行。(运行完成时间30min)。
在使用了TotalOrderPartitioner进行全排序过后,整个job任务的分区负载均衡,完成时间为:数据抽样时间(3min)+运算时间(10min)。
大大提高了负载均衡和运算的效率,减小了数据倾斜的情况。
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行各种类型的排序需求。
在Hadoop中常用的排序操作分为以下几类:
1. 自定义key值类型的键(实现WritableComparable接口)
2. 实现框架中的比较器(job.setSortComparatorClass(Class<? extends RawComparator))
3. 二次排序(实现自定义分区、自定义排序、自定义分组)
4. 全排序(TotalOrderPartitioner)
对于1、2的实现,实现WritableComparable接口,比较器需要继承RawComparator类或者WritableCompartor类。
主要讲解关于二次排序和全排序:
二次排序:
对于二次排序的实现,需要了解MapReduce中的数据结构和数据流,如下图:显示了影响数据结构和数据流的三个元素(分区、排序和分组)。
分区是在Map输出收集过程中被调用的,且用于确定哪些reduce端应该接收map输出。RawComparator用于对各自分区中的map输出进行排序,且都被map和reduce端使用。最后RawComparator负责通过被排序的记录确定分组边界。
具体实现例子:
1、输入数据:
sort1 1
sort2 3
sort2 77
sort2 54
sort1 2
sort6 22
sort6 221
sort6 20
2、目标输出(每个key值对应一个文件part-r-0000X)
sort1 1,2
sort2 3,54,77
sort6 20,22,221
(1)Map端处理:
根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录合并,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对Key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的Key值和其对应的数据组合成一个新的Key值,然后新的Key值对应的还是之前的数字。那么我们就可以将原始数据的map输出变成类似下面的数据结构:
{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那么我们只需要对[]里面的新key值进行排序就ok了。然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,所以我们需要根据新key值中的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}
分区操作完成之后,我调用自己的自定义排序器对新的Key值进行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}
(2)Reduce端处理:
经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,并且没处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成类似下面的数据结构:
{sort1,[1,2]}
{sort2,[3,54,77]}
{sort6,[20,22,221]}
代码示例:
1、自定义组合键
package com.mr; import java.io. dff7 DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义组合键 */ public class CombinationKey implements WritableComparable<CombinationKey>{ private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class); private Text firstKey; private IntWritable secondKey; public CombinationKey() { this.firstKey = new Text(); this.secondKey = new IntWritable(); } public Text getFirstKey() { return this.firstKey; } public void setFirstKey(Text firstKey) { this.firstKey = firstKey; } public IntWritable getSecondKey() { return this.secondKey; } public void setSecondKey(IntWritable secondKey) { this.secondKey = secondKey; } @Override public void readFields(DataInput dateInput) throws IOException { // TODO Auto-generated method stub this.firstKey.readFields(dateInput); this.secondKey.readFields(dateInput); } @Override public void write(DataOutput outPut) throws IOException { this.firstKey.write(outPut); this.secondKey.write(outPut); } /** * 自定义比较策略 * 注意:该比较策略用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段, * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整) */ @Override public int compareTo(CombinationKey combinationKey) { logger.info("-------CombinationKey flag-------"); return this.firstKey.compareTo(combinationKey.getFirstKey()); } }
说明:在自定义组合键的时候,需要特别注意,一定要实现WritableComparable接口,并且实现compareTo方法的比较策略。这个用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整),但是其对我们最终的二次排序结果是没有影响的。我们二次排序的最终结果是由我们的自定义比较器决定的。
2、自定义分区器
package com.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义分区 */ public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{ private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class); /** * 数据输入来源:map输出 * @param key map输出键值 * @param value map输出value值 * @param numPartitions 分区总数,即reduce task个数 */ @Override public int getPartition(CombinationKey key, IntWritable value,int numPartitions) { logger.info("--------enter DefinedPartition flag--------"); /** * 注意:这里采用默认的hash分区实现方法 * 根据组合键的第一个值作为分区 * 这里需要说明一下,如果不自定义分区的话,mapreduce框架会根据默认的hash分区方法, * 将整个组合将相等的分到一个分区中,这样的话显然不是我们要的效果 */ logger.info("--------out DefinedPartition flag--------"); return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
说明:具体说明看代码注释。
3、自定义比较器
package com.mr; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义二次排序策略 */ public class DefinedComparator extends WritableComparator { private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class); public DefinedComparator() { super(CombinationKey.class,true); } @Override public int compare(WritableComparable combinationKeyOne, WritableComparable CombinationKeyOther) { logger.info("---------enter DefinedComparator flag---------"); CombinationKey c1 = (CombinationKey) combinationKeyOne; CombinationKey c2 = (CombinationKey) CombinationKeyOther; /** * 确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序 * 另外,这个判断是可以调整最终输出的组合键第一个值的排序 * 下面这种比较对第一个字段的排序是升序的,如果想降序这将c1和c2倒过来(假设1) */ if(!c1.getFirstKey().equals(c2.getFirstKey())){ logger.info("---------out DefinedComparator flag---------"); return c1.getFirstKey().compareTo(c2.getFirstKey()); } else{//按照组合键的第二个键的升序排序,将c1和c2倒过来则是按照数字的降序排序(假设2) logger.info("---------out DefinedComparator flag---------"); return c1.getSecondKey().get()-c2.getSecondKey().get();//0,负数,正数 } /** * (1)按照上面的这种实现最终的二次排序结果为: * sort1 1,2 * sort2 3,54,77 * sort6 20,22,221 * (2)如果实现假设1,则最终的二次排序结果为: * sort6 20,22,221 * sort2 3,54,77 * sort1 1,2 * (3)如果实现假设2,则最终的二次排序结果为: * sort1 2,1 * sort2 77,54,3 * sort6 221,22,20 */ } }
说明:自定义比较器决定了我们二次排序的结果。自定义比较器需要继承WritableComparator类或者RawComparator类,并且重写compare方法实现自己的比较策略。
全排序(TotalOrderPartitioner):
我们知道MapReduce计算框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,Hadoop 默认的partitioner是HashPartitioner,它依赖于output
key的hashcode,使得相同key会去相同reducer,但是不保证全局有序(不同partition之间无序),如果想要获得全局排序结果(比如获取全局统计量,日志重复、缺失、总数等),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。
在TotalOrderPartitioner中,我们可以看到 TotalOrderPartitioner依赖于一个partition
file来进行distribute keys, partition file是一个实现计算好的sequence file, 如果我们设置的reduce num=N, 那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
在实现全排序中需要用到采样器:InputSampler, InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:
1. RandomSampler 随机取样
2. IntervalSampler 从s个split里面按照一定间隔取样,通常适用于有序数据
3. SplitSampler 从s个split中选取前n条记录取样
在采样器 .writePartitionFile() 源码中我们可以看到
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = job.getConfiguration(); final InputFormat inf = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //反射获取输入格式类 int numPartitions = job.getNumReduceTasks(); //获取reduce num个数 K[] samples = (K[])sampler.getSample(inf, job); //进行数据抽样 LOG.info("Using " + samples.length + " samples"); RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator(); Arrays.sort(samples, comparator); //对抽样的数据进行排序 Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); FileSystem fs = dst.getFileSystem(conf); if (fs.exists(dst)) { fs.delete(dst, false); } SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class); NullWritable nullValue = NullWritable.get(); float stepSize = samples.length / (float) numPartitions; //产生分区边界写入SequenceFile中 int last = -1; for(int i = 1; i < numPartitions; ++i) { int k = Math.round(stepSize * i); while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { ++k; } writer.append(samples[k], nullValue); last = k; } writer.close(); }
可以看到采样器在进行采样的时候,会调用输入InputFormat类,所以在进行全排序的时候需要自定义输入InputFormat,使得MapReduce计算框架中的Map输出的KV的逻辑写在自定义的RecordReader的nextkv函数中,以保证采样器用来采样并排序的数据是Map端的输出数据格式。
在Hadoop中利用InputSampler实现生成 全排序分区边界 文件的步骤:
1. 对待排序的数据进行抽样:(抽样时必须保证 抽样的输入==Map的输出)
2. 对抽样的数据进行排序,产生区间边界。(例如:pivot:3, 9 ,11)
3. Map端数据的输出,通过Partitioner来计算该数据KV处于哪个pivot之间,之后将数据相应的分成对应的分区。
(例如区间的划分: <3, [3,9) , >=9 ,分别对应Reduce0,Reduce1,Reduce2)
使用TotalOrderPartitioner的驱动配置如下:
InputSampler.Sampler<key_Own, val_Own> sampler = new InputSampler.RandomSampler<key_Own,val_Own>(0.1,10000,30); TotalOrderPartitioner.setPartitionFile(conf, path); job.setInputFormatClass(OwnInputFormat.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setNumReduceTasks(3); job.set.........//job任务的一些其他设置:如MapOutputKeyClass等 InputSampler.writePartitionFile(job, sampler); //启动采样并写入分区文件中, 在此伪代码中不需要提交运行job
使用TotalOrderPartitioner全排序的最大好处就是可以有效地避免出现数据倾斜的任务。
在工作中遇到需要对30G+的日志文件进行统计(日志总数、遗漏数、重复数、服务器重启次数)。在不使用TotalOrderPartitioner进行分区时,使用默认Hash分区,发现数据出现了倾斜,(即在某一个分区中存在大量的数据,使得这个分区的运行时间大大超过其他分区),使这个分区成为了拖延整个job任务的执行。(运行完成时间30min)。
在使用了TotalOrderPartitioner进行全排序过后,整个job任务的分区负载均衡,完成时间为:数据抽样时间(3min)+运算时间(10min)。
大大提高了负载均衡和运算的效率,减小了数据倾斜的情况。
相关文章推荐
- Hadoop学习笔记: MapReduce二次排序
- Hadoop 学习研究(五): hadoop中的join操作
- Hadoop学习之自定义二次排序
- (Hadoop学习-2)mapreduce实现二次排序
- Hadoop二次排序
- hadoop二次排序(合集)
- Hadoop二次排序
- 对Hadoop二次排序的理解
- 二次学习(节外生枝篇)一、初探Hadoop(4)
- Hadoop学习笔记(二):MapReduce的特性-计数器、排序
- hadoop之二次排序
- 二次学习(节外生枝篇)一、初探Hadoop(3)
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- hadoop的二次排序
- 个人hadoop学习总结:Hadoop集群+HBase集群+Zookeeper集群+chukwa监控(包括单机、伪分布、完全分布安装操作)
- hadoop学习笔记-hive安装及操作
- hadoop 二次排序
- 二次学习(节外生枝篇)一、初探Hadoop(2)
- Hadoop二次排序
- hadoop二次排序