您的位置:首页 > 运维架构

从自定义排序深入理解单机hadoop执行mapreduce过程

2016-06-02 21:03 387 查看
我们对数据进行处理的过程中,最常见的一种操作是排序和统计,特别是在数据量大的场景,实现高效的排序是业务系统开发过程中非常重要的一块。如何从hadoop中高效地提取有用的数据是工作中重要的一环。在自定义排序类的过程中,就遇到了一个小问题,而hadoop执行过程中对异常的处理往往是打印一个log,然后抛出封装过的异常,而且异常的信息非常通用而不具体,所以如果不看日志,往往比较难定位问题。面对这种情况,深入到源码,也正好能熟悉一下整个mapReduce的执行过程。下面就以一个简单的例子讲讲自定义排序要注意的一个小细节。

以hadoop权威指南中的辅助排序讲讲如何自定义排序,hadoop权威指南里面有个例子是这样的,从气象站数据中,找出每年的最高气温。如果仅仅根据年份排序,mapper输出后,在reducer遍历每年所有气温能在O(n)复杂度得出结果,但是有更好的办法(当然mahout早有功能更强大的实现类),如果根据年份分区,并且气温降序排序,那么在reducer中,就只要取第一条数据,就是最高气温了,也就是说,在reducer中可以以O(1)的时间复杂度得出结果。

假设有如下气温数据:

文件a:

1990 22

1990 33

1991 24

文件b:

1992 23

1992 26

1991 27

这里每一行的第一个字段是 年份,后面数字代表一年十二个月中,某个月的最高气温,月份我们不关心,就不写出来了。根据这样的情况,我们很容易写出如下初始化job的代码(不懂的请先补习一下MapReduce,另外只讲解重要部分代码,其它代码请见附件):

Configuration conf = getConf();
Job job = new Job(conf);
job.setJobName("SecondarySort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondaryMapper.class);
job.setReducerClass(SecodaryRecuder.class);
job.setOutputKeyClass(MyPairComparable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(SecondaryPartitioner.class);
job.setSortComparatorClass(SecondaryComparator.class);
job.setGroupingComparatorClass(SecondaryGroupCompator.class);
String input = args[0];
String output = args[1];
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1;


以上代码的输入输出很简单,就不细说了,这里说说自定义的排序类:MyPairComparable,官方文档的api有一段如下实现例子:
public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp;

public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}

public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}

public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}
根据上面这个官方文档,遇到第一个坑,很容易写成这样的:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.mahout.common.IntPairWritable;

public class MyPairComparable implements WritableComparable<MyPairComparable>,
Cloneable {

private int first;

private int second;

@Override
public boolean equals(Object arg0) {
// TODO Auto-generated method stub
return super.equals(arg0);
}

@Override
public int hashCode() {
// TODO Auto-generated method stub
return super.hashCode();
}

@Override
public String toString() {
return first + "\t" + second;
}

public MyPairComparable(int first, int second) {
super();
this.first = first;
this.second = second;
}

public int getFirst() {
return first;
}

public void setFirst(int first) {
this.first = first;
}

public int getSecond() {
return second;
}

public void setSecond(int second) {
this.second = second;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}

@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}

@Override
public int compareTo(MyPairComparable arg0) {
return this.compareTo(arg0);
}

static {
WritableComparator.define(IntPairWritable.class,
new SecondaryComparator());
}

}
同样,根据官方文档,我们很容易把comparator写成这样:
public class SecondaryComparator extends WritableComparator {

@Override
public int compare(WritableComparable a, WritableComparable b) {
MyPairComparable a1 = (MyPairComparable) a;
MyPairComparable a2 = (MyPairComparable) b;
if (a1.getFirst() != a2.getFirst()) {
return a1.getFirst() - a2.getFirst();
} else {
return -(a1.getSecond() - a2.getSecond());
}
}
}
然后执行代码,报错了,空指针,遇到这样的问题,正好阅读下源码,了解执行过程,下面从MapReduce执行的过程来看看上面的代码出了什么问题。MapReduce的整个流程大致如下: 通过FillenputFormt调用recorder读取数据——》mapper处理——》在分区中排序(shuffle)——》reducer处理——》输出。

先看启动,job调用waitForCompletion,代码如下:

public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
//提交作业到对列,生成jobId,校验文件路径,拷贝文件到文件系统,产生分片信息等等
submit();}
if (verbose) {
monitorAndPrintJob();//监听作业情况,包括整个job运行的模式,执行进度,task执行情况,成功,失败还是被kill等等一些相关信息,这里我们不关心这个
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter = //这里我们使用了本地文件系统
getJobSubmitter(cluster.getFileSystem
4000
(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}


里面主要做了两件事情,一件是监听作业情况,一件是提交作业,作业的提交是通过上面的submit函数实现的,逻辑就是产生一个作业,也就是LocalJobRunner.job,这个job会构建job的各种信息,包括读取job的配置,得到本地job的工作地址,初始化分布式缓存等等。然后就是读取分片信息,创建MapTaskRunnable执行mapper任务,这个MapTaskRunnable是需要关心的,所有的mapper程序,都是从这个taskRunnable开始的。这个taskRunable的逻辑其实也很简单,里面最主要的方法是runNewMapper,也就是这个方法会真正跑我们重写的Mapper方法,下面的类的名称就更加常见和熟悉了,其代码如下:

// 根据job和taskid获取任务上下文
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// 创建一个mapper实例。taskContext获取到的mapper的类其实就是在配置job的时候配置进去的。
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// 创建一个inputFormat,按照格式读入数据,这里我们没有设置,会创建默认的TextInputFormat
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// inputFormat用来读取数据的recorder也是这里创建
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
// 无论设置的reducer数量是多少,后面都会去创建分区partition实例和排序用的comparator,这里就是发现问题最重要的地方
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
.......
// 这个run方法就最接近我们的代码了,里面执行的就是map方法
mapper.run(mapperContext);


在上面的代码中,可能会调用我们排序对象的是new NewOutputCollector这个构造器,进去里面继续看,第一句代码如下:

collector = createSortingCollector(job, reporter);


顾名思义,创建排序的收集器,里面初始化有一句如下:

collector.init(context);


进入在collector.init里面又看到:

comparator = job.getOutputKeyComparator();


进入里面能看到:

ReflectionUtils.newInstance(theClass, this);


继续进入:

try {
Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
if (meth == null) {
meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
meth.setAccessible(true);
CONSTRUCTOR_CACHE.put(theClass, meth);
}
result = meth.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}


获取比较器的构造器,并创建实例,并且一定是个默认构造函数创建的实例,这里其实就隐含了一个官方文档例子没有特意提出来的问题,比较器SecondaryComparator必须有一个无参的构造函数。所以来到这里发现了上面自定义的类SecondaryComparator其实少了无参构造函数,这样程序就无法正常执行了,必须加上去。接着看代码,发现排序比较的时候获取key也是通过上面这段反射代码获取compareable的,所以MyPairComparable也必须加上无参默认构造函数。加上后程序能正常运行了。下面接着看代码。把执行环境和后续需要用到的对象都创建好后,mapper会执行run方法,mapper的run方法如下:

public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}


这里context.getCurrentKey和context.getCurrentValue就是通过recorder读取输入分片的数据的,然后调用我们重写的map方法。接着把map输出的数据放到一个环形buffer里面,当达到了buffer的阈值,就会把数据根据partition分片输出,接着mapper任务执行完,就会执行下一阶段的任务,也就是上面提到的在分区中排序。等所有mapper任务执行完后,会进入下一parse,排序并输出到分区,会调用上面构造好的比较器来完成这些操作,等这些操作完成好了。进入下一阶段,reduce,根据reducer设置的数量,产生reducer个数的runableTask,并加入到线程池中执行reducer任务:
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
这里的nextKey就会调用groupingComparator比较获取到的map输出的值,通过我们自定义的groupingComparator,就很容易获取到第一条记录,也就是某年的最高温度的记录了。
到这里reducer的reduce方法结束,后面的就是一些输出文件,关闭流,更新job的状态等等一些工作了,自此虽然没有太深入,也大概浏览了一遍mapreduce的过程了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce hadoop