您的位置:首页 > 大数据 > Hadoop

Hadoop源码分析——计算模型MapReduce

2019-01-21 22:49 399 查看

MapReduce 是一个计算模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于 key/value pair 的数据集合;然后在创建一个Reduce函数用来合并所有的具有相同中间 Key 值得中间Value值。

1. Map 处理过程

1.1 Mapper 概述

Mapper函数最核心的作用就是对输入的key/value进行处理,然后输出一系列的 key/value 集合。处理流程可以表示为:

(key,value)——> Mapper——>list<key,value>

在Mapper阶段处理之前,Hadoop的MapReduce框架会为由作业的InputFormat生成的每一个输入切片InputSplit创建一个对应的Mapper处理函数。在Mapper内部,我们可以通过调用JobContext的getConfiguration方法来获得与该作业相关的所有配置信息。

Hadoop会将Mapper的输出key/value按照key进行分组处理,使具有相同输出key的key/value键值对放在一起,然后将它们分给相同的Reducer来处理。用户可以通过指定特定的RawComparator实现类来控制分组过程的执行。此外用户也可以通过指定Partitioner实现类来控制Mapper的输出被分给哪个具体的Reducer进行处理。由于Reducer和Mapper一般会运行在不同的主机上,所以Reducer必须通过网络来获得Mapper的输出结果来作为输入。

为了减少网络上的数据传输量,我们可以为作业指定相应的Combiner。该类一般会采用已经实现好的Reducer来代替。Combiner会在Mapper端对Mapper的输出结果进行本地的聚集处理,从而减少发送给Reducer的数据量。另一种方法是采用相应的压缩机制对Mapper的输出进行压缩处理。另外并不是所有的Hadoop左右都哟Reducer处理函数,当没有Reducer时,mapper的处理结果会直接通过指定的OutputFormat写入到输出目录中。

1.2 Mapper 源代码分析

Mapper类所在的包为 org.apache.hadoop.mapreduce。

1.2.1 成员方法

在Mapper类中,除了run方法是public的之外,其他的map、cleanup和setup方法都是protected类型的访问权限,保证只有自己的子类以及该类相同报下的类可见。我们一般只会重写map方法来完成对输入key/value的处理。

//只会在mapper任务开始的时候调用一次,完成如MultipleOutputs必须在该方法中进行相应的实例化。
protected void setup(Context context ) throws IOException, InterruptedException {
// NOTHING
}
//只会在任务结束的时候被调用一次,完成一些作业完毕之后的清理工作
protected void cleanup(Context context ) throws IOException, InterruptedException {
// NOTHING
}

//map方法是该类的核心方法,针对每一个输入key/value键值对执行一次。
protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}

//该方法在hadoop运行mapper任务的时候被调用
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

1.2.2 内部类

在Mapper中包含一个 Context 内部类,该类只实现了MapContext类,并没有引入任何新方法:

public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}

1.2.3 Mapper 的实现子类

Hadoop 为我们提供了若干个针对不同处理情况的Mapper实现子类。

1.2.3.1 InverseMapper 反转Mapper

该类的实现比较简单,他只是重写了Mapper中的map方法来实现输入的key/value键值对中的key和value进行互换,在进行输出:

@Override
public void map(K key, V value, Context context
) throws IOException, InterruptedException {
context.write(value, key);
}
1.2.3.2 TokenCounterMapper 标记计数 Mapper

该类在map方法中根据默认的标记来对输入的value进行分解,并进行计数处理:

//对字符串分店进行计数
private final static IntWritable one = new IntWritable(1);
//将取到的字符串分段转换为Text类型
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
1.2.3.3 multithreadMapper 多线程 Mapper

MultithreadMapper 的主要工作原理就是启动多个线程来执行另一个Mapper中的map方法,这种方式可以有效的提供系统处理作业的能力。其中启动的线程个数由mapred.map.multithreadrunner.threads 配置项决定,默认为10个线程。要执行的Mapper通过mapred.map.multithreadrunner.class配置项决定。MultithreadMapper分别提供了用于设置上面两个配置项的set方法。

1)、成员变量

//MultithreadMapper要执行的Mapper类
private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
//用于将MultithreadMapper的处理结果进行输出的MapContext
private Context outer;
//运行map方法的MapRunner列表,其中MapRunner为MultithreadMapper中的一个内部类
private List<MapRunner> runners;

2)、成员方法

/**
* 获取MultithreadMapper启动的线程数
*/
public static int getNumberOfThreads(JobContext job) {
return job.getConfiguration().getInt(NUM_THREADS, 10);
}
/**
* 设置MultithreadMapper启动的线程数
*/
public static void setNumberOfThreads(Job job, int threads) {
job.getConfiguration().setInt(NUM_THREADS, threads);
}
/**
* 获取MultithreadMapper所处理的Mapper类
*/
public static <K1,V1,K2,V2>
Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
return (Class<Mapper<K1,V1,K2,V2>>)
job.getConfiguration().getClass(MAP_CLASS, Mapper.class);
}
/**
* 设置MultithreadMapper所处理的Mapper类
*/
public static <K1,V1,K2,V2>
void setMapperClass(Job job,
Class<? extends Mapper<K1,V1,K2,V2>> cls) {
if (MultithreadedMapper.class.isAssignableFrom(cls)) {
throw new IllegalArgumentException("Can't have recursive " +
"MultithreadedMapper instances.");
}
job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class);
/**
* run方法按指定的线程个数运行指定的Mapper类中的run方法
*/
public void run(Context context) throws IOException, InterruptedException {
outer = context;
//首先取得需要创建的线程数
int numberOfThreads = getNumberOfThreads(context);
//然后取得被执行的Mapper类
mapClass = getMapperClass(context);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring multithread runner to use " + numberOfThreads +
" threads");
}
//初始化numberofThread是数量的线程,然后放入到线程池中
runners =  new ArrayList<MapRunner>(numberOfThreads);
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = new MapRunner(context);
thread.start();
runners.add(i, thread);
}
//对所有的线程执行join操作
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = runners.get(i);
thread.join();
......
}
}
}

3)、内部类

MultithreadMapper 中包含有四个内部类,由于被处理的Mapper共享一份InputSplit,所以InputSpilt数据的读取必须是线程安全的,这些子类通过互斥的访问MultithreadMapper中的Context来实现线程安全机制。

/**
*SubMapRecordReader类主要作用就是从指定的Context中复制键值对并保证这个操作是同步互斥进行的
*/
private class SubMapRecordReader extends RecordReader<K1,V1> {
......
}
/**
*SubMapRecordWriter类主要作用就是将Mapper出来的结果写入到指定的输出,这个操作也是同步互斥的
*/
private class SubMapRecordWriter extends RecordWriter<K2,V2> {
......
}
/**
*SubMapStatusReporter类的主要作用就是汇报MultithreadMapper的处理进度信息
*/
private class SubMapStatusReporter extends StatusReporter {
......
}
/**
*MapRunner类是真正的线程执行类,它会调用指定Mapper中的run方法
*/
private class MapRunner extends Thread {
......
//首先通过反射机制创建被处理的Mapper的实例,然后初始化与该Mapper相对应得执行上下文
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}
@Override
public void run() {
try {
mapper.run(subcontext);
reader.close();
} catch (Throwable ie) {
throwable = ie;
}
}
}
1.2.3.4 FieldSelectionMapper 字段选择Mapper

该类所在的包为org.apache.hadoop.mapreduce.lib.fieldsel,可以用于高效灵活的处理文本数据。该类将输入数据看做由用户指定的分隔符分割不同字段组成,默认的分隔符为Tab。可以选择输入字段列表中的若干个字段作为输出的key和value。

1.2.3.5 DelegatingMapper 授权 Mapper

DelegatingMapper的实现机制和DelegatingRecordReader的实现机制类似,他们都作为包装类来存在,正正功能的实现是靠内部封装的类来完成的。

2. Reducer 处理过程

2.1 Reducer 概述

Reducer的主要作用就是讲Mapper的输出结果中具有相同key的键值对进行进一步的reduce(规约)处理,从而产生更少的输出键值对。与Mapper不同的是,Reducer的数量是可以通过Job的setNumReduceTasks方法进行设置的。Reducer包括如下三个主要阶段:

  • Shuffle阶段:由于Reducer的输入是来自Mappe已经排好序的输出,而Reducer和Mapper一般在不同主机上,所以Reducer所在的第一步操作就是利用HTTP网络协议将所有Mapper的输出中与该Reducer相关的数据复制到Reducer的主机上。
  • Sort阶段:在该阶段,MapReduce框架会将来自不同Mapper的具有相同key的输出key/value键值对按照key进行排序。
  • Reduce阶段:该阶段,MapReduce框架会为已经分好组的每一个<key,(list of value)>调用一次reduce方法。Reducer的输出键值对通过RecordWriter写入到真正的文件系统。上面的Shuffle和sort两个阶段是同时进行的,Mapper的输出也是一边被取回一边被合并的。MapReduce还为我们提供了一个SecondarySort(二次排序)阶段。为了实现SecondarySort,我们需要定义一个group comparator(分组比较器)。sort阶段使用sort comparator针对key来进行操作,该阶段会将具有相同key的value放置在一个列表里:而group阶段是依赖group comparator进行的,该阶段会针对某个key所对应的value列表进行分组操作,使整个value列表分割成不同的组。

2.2 Reducer 源代码

与Mapper类中的一样,Reducer类中除了run方法是public,其它方法都是protected修饰的。

2.2.1 成员方法

//只会在Reducer任务开始的时候调用一次
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
//只会在任务结束的时候被调用一次
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
//reduce方法是该类的核心方法,在map方法中输入的是<key,alue>,而reduce输入的是<key,list(value)>
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}

2.2.2 内部类

在Reducer中也包含一个Context内部类,该类只是继承了ReduceContex类,并没有引入任何新方法。

2.2.3 IntSunReducer 和 LongSumReducer

这两个类都在lib.reduce包中,它们实现的目的都是一样的,即计算相同key所对应的所有value之和,如下列出IntSumReducer的源代码如下:

//定义用于保存计算结果之和IntWritable变量
private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
//计算所有的value之和
for (IntWritable val : values) {
sum += val.get();
}
//将int类型的结果化为IntWritable类型
result.set(sum);
context.write(key, result);
}

2.2.4 FieldSelectionReducer 字段选择 Reducer

该类将输入数据分割成若干个字段,然后针对这些字段进行相应的处理。

至此,Mapper和Reducer就都分析完毕了。在Mapper和Reducer之间还有一些特殊的处理步骤,比如Combiner以及Partitioner。

3. Partitioner 分区处理过程

3.1 Partitioner 概述

Partitioner 分区处理过程在Mapper之后,Reducer之前进行执行。它的主要作用就是把Mapper输出的中间结果按照key分给不同的Reducer任务进行处理。要保证Hadoop的负载均衡,Partitioner需要满足以下的两个条件:

  • 平均分布。即每个Reducer处理的reduce数量应该尽可能相等。
  • 高效。由于Mapper输出的每个key/value键值对在分发给Reduce处理之前都需要Partitioner的相应处理,所以它的效率至关重要,需要使用高效的算法实现。

3.1.2 Partitioner 源代码

//根据给的的键值对和分区的总数量(一般为Reducer任务的数量),返回该键值对所对应的分区号
public abstract class Partitioner<KEY, VALUE> {
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

3.1.3 HashPartitioner hash 分区

Had和Partitioner是Partitioner的默认实现类,该类会使用hash函数来对Mapper的输出进行分区处理。

//根据key的hash码和Reducer任务的数量来产生对应得分区号。
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

该方法会将Mapper输出的键值对均匀的分发给不同的Reducer。比如Key为Text的话,Text的hashcode方法跟String的基本机制,都采用Horner公式计算,得到一个int。string太大的话,这个int值可能会溢出成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布子啊reduce上。

3.1.4 BinaryPartitioner 二进制分区

BinaryPartitioner 处理的键值对值中的key必须是BinaryComparable字节可以比较类型的,例如Text就是一个BinaryComparable的实例。因为Text可以按字节进行比较,所以当两个Text的实例对于的字节不相等时就可以立即判断出他们的大小关系。BinaryPartitioner会利用BinaryComparable类型的key的getBytes()方法返回对应得字节数组中的一部分求出该键值对所对应得分区号。

3.1.5 KeyFieldBasedPartitioner 基于键字段的分区

KeyFieldBasedPartitioner的处理逻辑就是首先将key分割成由不同的字段的字段列表,然后取得列表中的若干个字段进行分区处理。其中key中不同字段之间的分隔符通过mapreduce.fields.for.partition配置项来确定取出key的前几个字段用作分区处理。

3.1.6 TotalOrderPartitioner 全排序分区

虽然每个Mapper的输出是排好序的,但是不同的Mapper的输出之间是没有顺序的。为了实现最终的Reducer的输出是排好序的,此时可以使用TotalOrderPartitioner。由于TotalOrderPartitioner本身无法确定数据的分布情况,所以它所作的第一件事情就是利用InputSampler数据采样器来确定数据的分布情况。获得数据的分布情况之后,接下来就是根据数据的分布情况来采取不同的分区策略。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: