Hadoop源码分析:Comparator的获取
目录
排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据Key进行排序,该操作是Hadoop的默认行为。不管逻辑是否需要,MapReduce程序中的Key值都会进行排序。
一、MapReduce中的排序
MapReduce程序中都有哪些排序呢?下图为Mapreduce程序的执行流程图。如图所示,内部排序分为Mapper端的快速排序和归并排序,以及Reducer端的归并排序。
二、问题的引入
如上分析可知,排序在Mapper端和Reducer端都会执行。那么既然要排序,就需要Key类对象之间能够比较,那么MapReduce程序中Key类的比较器是如何获得的呢?
总体来说Key类的比较器的获取可以用下图表示,以阶段1、阶段2、阶段3三个阶段划分并在源码解析中解释。
三、源码解析
MapTask在创建缓冲区时,会获取Key的比较器对象,源代码①为MapperTask内部类OutputBuffer的init方法。
[code]//源码① comparator = job.getOutputKeyComparator();
上述的方法会调用到Jobconf类,试图从该类中获得比较器。源码②如下:
[code]//源码② public RawComparator getOutputKeyComparator() { Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); }
源码②中的KEY_COMPARATOR是一个静态常量,定义在MRJobConf类中,是一个配置项,且默认没有配置,如源码③:
[code]//源码③ public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
若是上述属性已配置,则在源码④中的getClass方法可以获得设置的比较器,那么就会通过反射的方式创建比较器对象返回,即第二节阶段1,返回定义的比较器。那么若是上述属性未配置,则会进入第二节的阶段2。
[code]//源码④ public <U> Class<? extends U> getClass(String name,Class<? extends U> defaultValue,Class<U> xface) { try { Class<?> theClass = getClass(name, defaultValue); if (theClass != null && !xface.isAssignableFrom(theClass)) throw new RuntimeException(theClass+" not "+xface.getName()); else if (theClass != null) return theClass.asSubclass(xface); else return null; } catch (Exception e) { throw new RuntimeException(e); } }
源码③中的属性未配置,那么getClass的返回值为null,如源码⑤所示,那么源码②的theClass即为null,程序就会进入 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this)。
[code]//源码⑤ // 输入的name为null,default值也为null,所以返回值为null public Class<?> getClass(String name, Class<?> defaultValue) { String valueString = getTrimmed(name); if (valueString == null) return defaultValue; try { return getClassByName(valueString); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } }
从源码②中的WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this)方法可以看出,该方法的形参1,会调用getMapOutputKeyClass()方法,获取输出Key对应的类。然后用获取的类调用 asSubclass (WritableComparable.class)方法。通过该方法可以判定,Key类是否为WritableComparable类,若是,则会返回该类的类对象;若不是,则会throw new ClassCastException(this.toString())异常。
因此,若Key为自定义类,则需要该类实现WritableComparable接口,否则会抛出异常。
[code]//源码⑥ public <U> Class<? extends U> asSubclass(Class<U> clazz) { if (clazz.isAssignableFrom(this)) return (Class<? extends U>) this; else throw new ClassCastException(this.toString()); }
若Key类是WritableComparable类,则asSubclass方法会返回Key类的类对象。此时,WritableComparator.get()方法执行,该方法源码⑦如下。从源码⑦中可知,该方法是调用comparators.get()方法获得比较器。那么comparators是什么呢?
comparators是一个map集合,内部存储了Hadoop序列化类型对应的比较器,如Text类,IntWritable类等。这些类在加载时,已经将对应的比较器存入comparators,供需求时调用,这也是当Key是hadoop的序列化类型时可排序的原因。
对应第二节的阶段2,若输出key为hadoop序列化类型,那么能从comparators中获得比较器并返回。若是Key为自定义类型则进入阶段3。
源码⑦分析:若第一次从comparators中获取比较器失败,则会forceInit(c),再次从comparators中获取;若仍然获取失败,则会执行comparator = new WritableComparator(),为该类创建一个新的比较器并返回。即完成了第二节中的阶段3。
[code]//源码⑦ public static WritableComparator get( Class<? extends WritableComparable> c, Configuration conf) { WritableComparator comparator = comparators.get(c); if (comparator == null) { //若上述未获取到comparator // force the static initializers to run forceInit(c); //再次加载key类 // look to see if it is defined now comparator = comparators.get(c); //若仍然未获取到,则comparator为null // if not, use the generic one if (comparator == null) { //comparator为null,则会为key类创建新的比较器。 comparator = new WritableComparator(c, conf, true); } } // Newly passed Configuration objects should be used. ReflectionUtils.setConf(comparator, conf); return comparator; }
四、总结
通过源码分析,可以得出以下结论:
- 若Key为hadoop的序列化类型,如,Text、IntWritable、DoubleWritable类等,则会从comparators中获取对应的比较器,实现内部排序。
- 若Key为自定义类型,则需要该类实现WritableComparable接口,并重写方法,这样在未提供比较器的情况下,会为该类创建新的比较器。
- 若Key为自定义类型,在实现了WritableComparable接口后,还可以设置自定义的比较器。
- Hadoop源码分析:WritableComparator及排序实现方式
- hadoop源码分析-WritableComparator
- hadoop源码分析(2):Map-Reduce的过程解析
- hadoop的job提交的源码分析
- Hadoop源码分析11: IPC流程(6)volatile
- hadoop源码剖析--$HADOOP_HOME/bin/hadoop脚本文件分析
- Hadoop HDFS概念学习系列之Apache Hadoop hdfs源码分析(二十四)
- 源码级强力分析hadoop的RPC机制
- java使用websocket,并且获取HttpSession 源码分析(推荐)
- 第二人生的源码分析(三十六)获取消息包里每一个字段
- Hadoop源码分析的思路
- Hadoop源码分析- RPC client端篇
- Hadoop源码分析之Id.java
- Hadoop MapReduce Splits 切片源码分析及切片原理
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- 研磨Hadoop源码(六)ResourceManager启动分析2
- httpclient源码分析之 PoolingHttpClientConnectionManager 获取连接
- Hadoop源码分析HDFS ClientProtocol——getBlockLocations
- 源码级强力分析hadoop的RPC机制
- Hadoop源码分析之二(RPC机制之Call处理)