从自定义排序深入理解单机hadoop执行mapreduce过程
2016-06-02 21:03
387 查看
我们对数据进行处理的过程中,最常见的一种操作是排序和统计,特别是在数据量大的场景,实现高效的排序是业务系统开发过程中非常重要的一块。如何从hadoop中高效地提取有用的数据是工作中重要的一环。在自定义排序类的过程中,就遇到了一个小问题,而hadoop执行过程中对异常的处理往往是打印一个log,然后抛出封装过的异常,而且异常的信息非常通用而不具体,所以如果不看日志,往往比较难定位问题。面对这种情况,深入到源码,也正好能熟悉一下整个mapReduce的执行过程。下面就以一个简单的例子讲讲自定义排序要注意的一个小细节。
以hadoop权威指南中的辅助排序讲讲如何自定义排序,hadoop权威指南里面有个例子是这样的,从气象站数据中,找出每年的最高气温。如果仅仅根据年份排序,mapper输出后,在reducer遍历每年所有气温能在O(n)复杂度得出结果,但是有更好的办法(当然mahout早有功能更强大的实现类),如果根据年份分区,并且气温降序排序,那么在reducer中,就只要取第一条数据,就是最高气温了,也就是说,在reducer中可以以O(1)的时间复杂度得出结果。
假设有如下气温数据:
文件a:
1990 22
1990 33
1991 24
文件b:
1992 23
1992 26
1991 27
这里每一行的第一个字段是 年份,后面数字代表一年十二个月中,某个月的最高气温,月份我们不关心,就不写出来了。根据这样的情况,我们很容易写出如下初始化job的代码(不懂的请先补习一下MapReduce,另外只讲解重要部分代码,其它代码请见附件):
以上代码的输入输出很简单,就不细说了,这里说说自定义的排序类:MyPairComparable,官方文档的api有一段如下实现例子:
先看启动,job调用waitForCompletion,代码如下:
里面主要做了两件事情,一件是监听作业情况,一件是提交作业,作业的提交是通过上面的submit函数实现的,逻辑就是产生一个作业,也就是LocalJobRunner.job,这个job会构建job的各种信息,包括读取job的配置,得到本地job的工作地址,初始化分布式缓存等等。然后就是读取分片信息,创建MapTaskRunnable执行mapper任务,这个MapTaskRunnable是需要关心的,所有的mapper程序,都是从这个taskRunnable开始的。这个taskRunable的逻辑其实也很简单,里面最主要的方法是runNewMapper,也就是这个方法会真正跑我们重写的Mapper方法,下面的类的名称就更加常见和熟悉了,其代码如下:
在上面的代码中,可能会调用我们排序对象的是new NewOutputCollector这个构造器,进去里面继续看,第一句代码如下:
顾名思义,创建排序的收集器,里面初始化有一句如下:
进入在collector.init里面又看到:
进入里面能看到:
继续进入:
获取比较器的构造器,并创建实例,并且一定是个默认构造函数创建的实例,这里其实就隐含了一个官方文档例子没有特意提出来的问题,比较器SecondaryComparator必须有一个无参的构造函数。所以来到这里发现了上面自定义的类SecondaryComparator其实少了无参构造函数,这样程序就无法正常执行了,必须加上去。接着看代码,发现排序比较的时候获取key也是通过上面这段反射代码获取compareable的,所以MyPairComparable也必须加上无参默认构造函数。加上后程序能正常运行了。下面接着看代码。把执行环境和后续需要用到的对象都创建好后,mapper会执行run方法,mapper的run方法如下:
这里context.getCurrentKey和context.getCurrentValue就是通过recorder读取输入分片的数据的,然后调用我们重写的map方法。接着把map输出的数据放到一个环形buffer里面,当达到了buffer的阈值,就会把数据根据partition分片输出,接着mapper任务执行完,就会执行下一阶段的任务,也就是上面提到的在分区中排序。等所有mapper任务执行完后,会进入下一parse,排序并输出到分区,会调用上面构造好的比较器来完成这些操作,等这些操作完成好了。进入下一阶段,reduce,根据reducer设置的数量,产生reducer个数的runableTask,并加入到线程池中执行reducer任务:
到这里reducer的reduce方法结束,后面的就是一些输出文件,关闭流,更新job的状态等等一些工作了,自此虽然没有太深入,也大概浏览了一遍mapreduce的过程了。
以hadoop权威指南中的辅助排序讲讲如何自定义排序,hadoop权威指南里面有个例子是这样的,从气象站数据中,找出每年的最高气温。如果仅仅根据年份排序,mapper输出后,在reducer遍历每年所有气温能在O(n)复杂度得出结果,但是有更好的办法(当然mahout早有功能更强大的实现类),如果根据年份分区,并且气温降序排序,那么在reducer中,就只要取第一条数据,就是最高气温了,也就是说,在reducer中可以以O(1)的时间复杂度得出结果。
假设有如下气温数据:
文件a:
1990 22
1990 33
1991 24
文件b:
1992 23
1992 26
1991 27
这里每一行的第一个字段是 年份,后面数字代表一年十二个月中,某个月的最高气温,月份我们不关心,就不写出来了。根据这样的情况,我们很容易写出如下初始化job的代码(不懂的请先补习一下MapReduce,另外只讲解重要部分代码,其它代码请见附件):
Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("SecondarySort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondaryMapper.class); job.setReducerClass(SecodaryRecuder.class); job.setOutputKeyClass(MyPairComparable.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(SecondaryPartitioner.class); job.setSortComparatorClass(SecondaryComparator.class); job.setGroupingComparatorClass(SecondaryGroupCompator.class); String input = args[0]; String output = args[1]; FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1;
以上代码的输入输出很简单,就不细说了,这里说说自定义的排序类:MyPairComparable,官方文档的api有一段如下实现例子:
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }根据上面这个官方文档,遇到第一个坑,很容易写成这样的:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.mahout.common.IntPairWritable; public class MyPairComparable implements WritableComparable<MyPairComparable>, Cloneable { private int first; private int second; @Override public boolean equals(Object arg0) { // TODO Auto-generated method stub return super.equals(arg0); } @Override public int hashCode() { // TODO Auto-generated method stub return super.hashCode(); } @Override public String toString() { return first + "\t" + second; } public MyPairComparable(int first, int second) { super(); this.first = first; this.second = second; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public int compareTo(MyPairComparable arg0) { return this.compareTo(arg0); } static { WritableComparator.define(IntPairWritable.class, new SecondaryComparator()); } }同样,根据官方文档,我们很容易把comparator写成这样:
public class SecondaryComparator extends WritableComparator { @Override public int compare(WritableComparable a, WritableComparable b) { MyPairComparable a1 = (MyPairComparable) a; MyPairComparable a2 = (MyPairComparable) b; if (a1.getFirst() != a2.getFirst()) { return a1.getFirst() - a2.getFirst(); } else { return -(a1.getSecond() - a2.getSecond()); } } }然后执行代码,报错了,空指针,遇到这样的问题,正好阅读下源码,了解执行过程,下面从MapReduce执行的过程来看看上面的代码出了什么问题。MapReduce的整个流程大致如下: 通过FillenputFormt调用recorder读取数据——》mapper处理——》在分区中排序(shuffle)——》reducer处理——》输出。
先看启动,job调用waitForCompletion,代码如下:
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { //提交作业到对列,生成jobId,校验文件路径,拷贝文件到文件系统,产生分片信息等等 submit();} if (verbose) { monitorAndPrintJob();//监听作业情况,包括整个job运行的模式,执行进度,task执行情况,成功,失败还是被kill等等一些相关信息,这里我们不关心这个 } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); } public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = //这里我们使用了本地文件系统 getJobSubmitter(cluster.getFileSystem 4000 (), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
里面主要做了两件事情,一件是监听作业情况,一件是提交作业,作业的提交是通过上面的submit函数实现的,逻辑就是产生一个作业,也就是LocalJobRunner.job,这个job会构建job的各种信息,包括读取job的配置,得到本地job的工作地址,初始化分布式缓存等等。然后就是读取分片信息,创建MapTaskRunnable执行mapper任务,这个MapTaskRunnable是需要关心的,所有的mapper程序,都是从这个taskRunnable开始的。这个taskRunable的逻辑其实也很简单,里面最主要的方法是runNewMapper,也就是这个方法会真正跑我们重写的Mapper方法,下面的类的名称就更加常见和熟悉了,其代码如下:
// 根据job和taskid获取任务上下文 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // 创建一个mapper实例。taskContext获取到的mapper的类其实就是在配置job的时候配置进去的。 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 创建一个inputFormat,按照格式读入数据,这里我们没有设置,会创建默认的TextInputFormat org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // inputFormat用来读取数据的recorder也是这里创建 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); // 无论设置的reducer数量是多少,后面都会去创建分区partition实例和排序用的comparator,这里就是发现问题最重要的地方 if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } ....... // 这个run方法就最接近我们的代码了,里面执行的就是map方法 mapper.run(mapperContext);
在上面的代码中,可能会调用我们排序对象的是new NewOutputCollector这个构造器,进去里面继续看,第一句代码如下:
collector = createSortingCollector(job, reporter);
顾名思义,创建排序的收集器,里面初始化有一句如下:
collector.init(context);
进入在collector.init里面又看到:
comparator = job.getOutputKeyComparator();
进入里面能看到:
ReflectionUtils.newInstance(theClass, this);
继续进入:
try { Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); if (meth == null) { meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } result = meth.newInstance(); } catch (Exception e) { throw new RuntimeException(e); }
获取比较器的构造器,并创建实例,并且一定是个默认构造函数创建的实例,这里其实就隐含了一个官方文档例子没有特意提出来的问题,比较器SecondaryComparator必须有一个无参的构造函数。所以来到这里发现了上面自定义的类SecondaryComparator其实少了无参构造函数,这样程序就无法正常执行了,必须加上去。接着看代码,发现排序比较的时候获取key也是通过上面这段反射代码获取compareable的,所以MyPairComparable也必须加上无参默认构造函数。加上后程序能正常运行了。下面接着看代码。把执行环境和后续需要用到的对象都创建好后,mapper会执行run方法,mapper的run方法如下:
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
这里context.getCurrentKey和context.getCurrentValue就是通过recorder读取输入分片的数据的,然后调用我们重写的map方法。接着把map输出的数据放到一个环形buffer里面,当达到了buffer的阈值,就会把数据根据partition分片输出,接着mapper任务执行完,就会执行下一阶段的任务,也就是上面提到的在分区中排序。等所有mapper任务执行完后,会进入下一parse,排序并输出到分区,会调用上面构造好的比较器来完成这些操作,等这些操作完成好了。进入下一阶段,reduce,根据reducer设置的数量,产生reducer个数的runableTask,并加入到线程池中执行reducer任务:
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } }这里的nextKey就会调用groupingComparator比较获取到的map输出的值,通过我们自定义的groupingComparator,就很容易获取到第一条记录,也就是某年的最高温度的记录了。
到这里reducer的reduce方法结束,后面的就是一些输出文件,关闭流,更新job的状态等等一些工作了,自此虽然没有太深入,也大概浏览了一遍mapreduce的过程了。
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- MongoDB中的MapReduce简介
- Mongodb中MapReduce实现数据聚合方法详解
- MongoDB学习笔记之MapReduce使用示例
- MongoDB中MapReduce编程模型使用实例
- Apache Hadoop版本详解
- MapReduce中ArrayWritable 使用指南
- Java函数式编程(七):MapReduce
- linux下搭建hadoop环境步骤分享
- java连接hdfs ha和调用mapreduce jar示例
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍