您的位置:首页 > 大数据 > Hadoop

Hadoop源码分析:Comparator的获取

2020-07-14 04:30 253 查看

目录

一、MapReduce中的排序

二、问题的引入

三、源码解析

四、总结

排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据Key进行排序,该操作是Hadoop的默认行为。不管逻辑是否需要,MapReduce程序中的Key值都会进行排序。

一、MapReduce中的排序

MapReduce程序中都有哪些排序呢?下图为Mapreduce程序的执行流程图。如图所示,内部排序分为Mapper端的快速排序和归并排序,以及Reducer端的归并排序。

二、问题的引入

如上分析可知,排序在Mapper端和Reducer端都会执行。那么既然要排序,就需要Key类对象之间能够比较,那么MapReduce程序中Key类的比较器是如何获得的呢?

总体来说Key类的比较器的获取可以用下图表示,以阶段1、阶段2、阶段3三个阶段划分并在源码解析中解释。

三、源码解析

MapTask在创建缓冲区时,会获取Key的比较器对象,源代码①为MapperTask内部类OutputBuffer的init方法。

[code]//源码①
comparator = job.getOutputKeyComparator();

上述的方法会调用到Jobconf类,试图从该类中获得比较器。源码②如下:

[code]//源码②
public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

源码②中的KEY_COMPARATOR是一个静态常量,定义在MRJobConf类中,是一个配置项,且默认没有配置,如源码③:

[code]//源码③
public static final String KEY_COMPARATOR =
"mapreduce.job.output.key.comparator.class";

若是上述属性已配置,则在源码④中的getClass方法可以获得设置的比较器,那么就会通过反射的方式创建比较器对象返回,即第二节阶段1,返回定义的比较器。那么若是上述属性未配置,则会进入第二节的阶段2。

[code]//源码④
public <U> Class<? extends U> getClass(String name,Class<? extends U>
defaultValue,Class<U> xface) {
try {
Class<?> theClass = getClass(name, defaultValue);
if (theClass != null && !xface.isAssignableFrom(theClass))
throw new RuntimeException(theClass+" not "+xface.getName());
else if (theClass != null)
return theClass.asSubclass(xface);
else
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

源码③中的属性未配置,那么getClass的返回值为null,如源码⑤所示,那么源码②的theClass即为null,程序就会进入    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this)。

[code]//源码⑤
// 输入的name为null,default值也为null,所以返回值为null
public Class<?> getClass(String name, Class<?> defaultValue) {
String valueString = getTrimmed(name);
if (valueString == null)
return defaultValue;
try {
return getClassByName(valueString);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

从源码②中的WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this)方法可以看出,该方法的形参1,会调用getMapOutputKeyClass()方法,获取输出Key对应的类。然后用获取的类调用  asSubclass           (WritableComparable.class)方法。通过该方法可以判定,Key类是否为WritableComparable类,若是,则会返回该类的类对象;若不是,则会throw new ClassCastException(this.toString())异常。

因此,若Key为自定义类,则需要该类实现WritableComparable接口,否则会抛出异常。

[code]//源码⑥
public <U> Class<? extends U> asSubclass(Class<U> clazz) {
if (clazz.isAssignableFrom(this))
return (Class<? extends U>) this;
else
throw new ClassCastException(this.toString());
}

若Key类是WritableComparable类,则asSubclass方法会返回Key类的类对象。此时,WritableComparator.get()方法执行,该方法源码⑦如下。从源码⑦中可知,该方法是调用comparators.get()方法获得比较器。那么comparators是什么呢?

comparators是一个map集合,内部存储了Hadoop序列化类型对应的比较器,如Text类,IntWritable类等。这些类在加载时,已经将对应的比较器存入comparators,供需求时调用,这也是当Key是hadoop的序列化类型时可排序的原因。

对应第二节的阶段2,若输出key为hadoop序列化类型,那么能从comparators中获得比较器并返回。若是Key为自定义类型则进入阶段3。

源码⑦分析:若第一次从comparators中获取比较器失败,则会forceInit(c),再次从comparators中获取;若仍然获取失败,则会执行comparator = new WritableComparator(),为该类创建一个新的比较器并返回。即完成了第二节中的阶段3。

[code]//源码⑦
public static WritableComparator get(
Class<? extends WritableComparable> c, Configuration conf) {
WritableComparator comparator = comparators.get(c);
if (comparator == null) {               //若上述未获取到comparator
// force the static initializers to run
forceInit(c);                        //再次加载key类
// look to see if it is defined now
comparator = comparators.get(c);     //若仍然未获取到,则comparator为null
// if not, use the generic one
if (comparator == null) {            //comparator为null,则会为key类创建新的比较器。
comparator = new WritableComparator(c, conf, true);
}
}
// Newly passed Configuration objects should be used.
ReflectionUtils.setConf(comparator, conf);
return comparator;
}

四、总结

通过源码分析,可以得出以下结论:

  1. 若Key为hadoop的序列化类型,如,Text、IntWritable、DoubleWritable类等,则会从comparators中获取对应的比较器,实现内部排序。
  2. 若Key为自定义类型,则需要该类实现WritableComparable接口,并重写方法,这样在未提供比较器的情况下,会为该类创建新的比较器。
  3. 若Key为自定义类型,在实现了WritableComparable接口后,还可以设置自定义的比较器。


 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: