您的位置:首页 > 运维架构

[Hadoop源码解读](六)MapReduce篇之MapTask类

2014-04-26 12:42 531 查看
MapTask类继承于Task类,它最主要的方法就是run(),用来执行这个Map任务。

run()首先设置一个TaskReporter并启动,然后调用JobConf的getUseNewAPI()判断是否使用New API,使用New API的设置在前面[Hadoop源码解读](三)MapReduce篇之Job类 讲到过,再调用Task继承来的initialize()方法初始化这个task,接着根据需要执行runJobCleanupTask()、runJobSetupTask()、runTaskCleanupTask()或相应的Mapper,执行Mapper时根据情况使用不同版本的MapReduce,这个版本是设置参数决定的。

[html] view
plaincopyprint?

@Override

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, ClassNotFoundException, InterruptedException {

this.umbilical = umbilical;

// start thread that will handle communication with parent

TaskReporter reporter = new TaskReporter(getProgress(), umbilical,

jvmContext);

reporter.startCommunicationThread();

boolean useNewApi = job.getUseNewMapper(); //是由JobConf来的,而New API 的JobContext包含一个JobConf,Job类有

//setUseNewAPI()方法,当Job.submit()时使用它,这样,waitForCompletion()就用submit()设置了使用New API,而此时就使用它。

initialize(job, getJobID(), reporter, useNewApi);//一个Task的初始化工作,包括jobContext,taskContext,输出路径等,

//使用的是Task.initialize()方法

// check if it is a cleanupJobTask

if (jobCleanup) {

runJobCleanupTask(umbilical, reporter);

return;

}

if (jobSetup) {

runJobSetupTask(umbilical, reporter);

return;

}

if (taskCleanup) {

runTaskCleanupTask(umbilical, reporter);

return;

}

if (useNewApi) {//根据情况使用不同的MapReduce版本执行Mapper

runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

runOldMapper(job, splitMetaInfo, umbilical, reporter);

}

done(umbilical, reporter);

}

runNewMapper对应new API的MapReduce,而runOldMapper对应旧API。

runNewMapper首先创建TaskAttemptContext对象,Mapper对象,InputFormat对象,InputSplit,RecordReader;然后根据是否有Reduce task来创建不同的输出收集器NewDirectOutputCollector[没有reducer]或NewOutputCollector[有reducer],接下来调用input.initialize()初始化RecordReader,主要是为输入做准备,设置RecordReader,输入路径等等。然后到最主要的部分:mapper.run()。这个方法就是调用前面[Hadoop源码解读](二)MapReduce篇之Mapper类讲到的Mapper.class的run()方法。然后就是一条一条的读取K/V对,这样就衔接起来了。

[java] view
plaincopyprint?

@SuppressWarnings("unchecked")

private <INKEY,INVALUE,OUTKEY,OUTVALUE>

void runNewMapper(final JobConf job,

final TaskSplitIndex splitIndex,

final TaskUmbilicalProtocol umbilical,

TaskReporter reporter

) throws IOException, ClassNotFoundException,

InterruptedException {

// make a task context so we can get the classes

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());

// make a mapper

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =

(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

// make the input format

org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =

(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

// rebuild the input split

org.apache.hadoop.mapreduce.InputSplit split = null;

split = getSplitDetails(new Path(splitIndex.getSplitLocation()),

splitIndex.getStartOffset());

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

new NewTrackingRecordReader<INKEY,INVALUE>

(split, inputFormat, reporter, job, taskContext);

job.setBoolean("mapred.skip.on", isSkipping());

org.apache.hadoop.mapreduce.RecordWriter output = null;

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

mapperContext = null;

try {

Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =

org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor

(new Class[]{org.apache.hadoop.mapreduce.Mapper.class,

Configuration.class,

org.apache.hadoop.mapreduce.TaskAttemptID.class,

org.apache.hadoop.mapreduce.RecordReader.class,

org.apache.hadoop.mapreduce.RecordWriter.class,

org.apache.hadoop.mapreduce.OutputCommitter.class, //

org.apache.hadoop.mapreduce.StatusReporter.class,

org.apache.hadoop.mapreduce.InputSplit.class});

// get an output object

if (job.getNumReduceTasks() == 0) {

output =

new NewDirectOutputCollector(taskContext, job, umbilical, reporter);

} else {

output = new NewOutputCollector(taskContext, job, umbilical, reporter);

}

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),

input, output, committer,

reporter, split);

input.initialize(split, mapperContext);

mapper.run(mapperContext);

input.close();

output.close(mapperContext);

} catch (NoSuchMethodException e) {

throw new IOException("Can't find Context constructor", e);

} catch (InstantiationException e) {

throw new IOException("Can't create Context", e);

} catch (InvocationTargetException e) {

throw new IOException("Can't invoke Context constructor", e);

} catch (IllegalAccessException e) {

throw new IOException("Can't invoke Context constructor", e);

}

}

至于运行哪个Mapper类,一般是我们用job.setMapperClass(SelectGradeMapper.class)设置的,那设置后是怎样获取的,或者默认值是什么,且看下面的追溯。

MapTask.runNewMapper()

=> (TaskAttemptContext)taskContext.getMapperClass(); //runNewMapper生成mapper时用到。

=> JobContext.getMapperClass()

=> JobConf.getClass(MAP_CLASS_ATTR,Mapper.class)

=> Configuration.getClass(name,default)

根据上面一层的调用关系,找到了默认值是Mapper.class,它的获取过程也一目了然。

再仔细看看Configuration.getClass()

[java] view
plaincopyprint?

public Class<?> getClass(String name, Class<?> defaultValue) {

String valueString = get(name);

if (valueString == null)

return defaultValue;

try {

return getClassByName(valueString);

} catch (ClassNotFoundException e) {

throw new RuntimeException(e);

}

}

它首先看是否设置了某个属性,如果设置了,就调用getClassByName获取这个属性对应的类[加载之],否则就返回默认值。

Mapper执行完后,关闭RecordReader和OutputCollector等资源就完事了。

另外我们把关注点放在上面的runNewMapper()中的mapper.run(mapperContext);前面对Mapper.class提到,这个mapperContext会被用于读取输入分片的K/V对和写出输出结果的K/V对。而由

[java] view
plaincopyprint?

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),

input, output, committer,

reporter, split);

可以看出,这个Context是由我们设置的mapper,RecordReader等进行配置的。

Mapper中的map方法不断使用context.write(K,V)进行输出,我们看这个函数是怎么进行的,先看Context类的层次关系:



write()方法是由TaskInputOutputContext来的:

[java] view
plaincopyprint?

public void write(KEYOUT key, VALUEOUT value

) throws IOException, InterruptedException {

output.write(key, value);

}

它调用了RecordWriter.write(),RecordWriter是一个抽象类,主要是规定了write方法。

[java] view
plaincopyprint?

public abstract class RecordWriter<K, V> {

public abstract void write(K key, V value

) throws IOException, InterruptedException;

public abstract void close(TaskAttemptContext context

) throws IOException, InterruptedException;

}

然后看RecordWriter的一个实现NewOutputCollector,它是MapTask的内部类:

[java] view
plaincopyprint?

private class NewOutputCollector<K,V>

extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {

private final MapOutputCollector<K,V> collector;

private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;

private final int partitions;

@SuppressWarnings("unchecked")

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

JobConf job,

TaskUmbilicalProtocol umbilical,

TaskReporter reporter

) throws IOException, ClassNotFoundException {

collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);

partitions = jobContext.getNumReduceTasks();

if (partitions > 0) {

partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

} else {

partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {

@Override

public int getPartition(K key, V value, int numPartitions) {

return -1;

}

};

}

}

@Override

public void write(K key, V value) throws IOException, InterruptedException {

collector.collect(key, value,

partitioner.getPartition(key, value, partitions));

}

@Override

public void close(TaskAttemptContext context

) throws IOException,InterruptedException {

try {

collector.flush();

} catch (ClassNotFoundException cnf) {

throw new IOException("can't find class ", cnf);

}

collector.close();

}

}

从它的write()方法,我们从context.write(K,V)追溯到了collector.collect(K,V,partition),注意到输出需要一个Partitioner的getPartitioner()来提供当前K/V对的所属分区,因为要对K/V对分区,不同分区输出到不同Reducer,Partitioner默认是HashPartitioner,可设置,Reduce
task数量决定Partition数量;

我们可以从NewOutputCollector看出NewOutputCollector就是MapOutputBuffer的封装。MapoutputBuffer是旧API中就存在了的,它很复杂,但很关键,暂且放着先,反正就是收集输出K/V对的。它实现了MapperOutputCollector接口:

[java] view
plaincopyprint?

interface MapOutputCollector<K, V> {

public void collect(K key, V value, int partition

) throws IOException, InterruptedException;

public void close() throws IOException, InterruptedException;

public void flush() throws IOException, InterruptedException,

ClassNotFoundException;

}

这个接口告诉我们,收集器必须实现collect,close,flush方法。

看一个简单的:NewDirectOutputCollector,它在没有reduce task的时候使用,主要是从InputFormat中获取OutputFormat的RecordWriter,然后就可以用这个RecordWriter的write()方法来写出,这就与我们设置的输出格式对应起来了。

[java] view
plaincopyprint?

private class NewDirectOutputCollector<K,V>

extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {

private final org.apache.hadoop.mapreduce.RecordWriter out;

private final TaskReporter reporter;

private final Counters.Counter mapOutputRecordCounter;

private final Counters.Counter fileOutputByteCounter;

private final Statistics fsStats;

@SuppressWarnings("unchecked")

NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)

throws IOException, ClassNotFoundException, InterruptedException {

this.reporter = reporter;

Statistics matchedStats = null;

if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {

//outputFormat是Task来的,内部类访问外部类成员变量

matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

.getOutputPath(jobContext), job);

}

fsStats = matchedStats;

mapOutputRecordCounter =

reporter.getCounter(MAP_OUTPUT_RECORDS);

fileOutputByteCounter = reporter

.getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);

long bytesOutPrev = getOutputBytes(fsStats);

out = outputFormat.getRecordWriter(taskContext); //主要是这句,获取设置的OutputputFormat里的RecordWriter

long bytesOutCurr = getOutputBytes(fsStats);

fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);

}

@Override

@SuppressWarnings("unchecked")

public void write(K key, V value)

throws IOException, InterruptedException {

reporter.progress(); //报告一下进度

long bytesOutPrev = getOutputBytes(fsStats);

out.write(key, value);//使用out收集一条记录,out是设置的OutputFormat来的。

long bytesOutCurr = getOutputBytes(fsStats);

fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); //更新输出字节数

mapOutputRecordCounter.increment(1); //更新输出K/V对数量

}

@Override

public void close(TaskAttemptContext context)

throws IOException,InterruptedException {

reporter.progress();

if (out != null) {

long bytesOutPrev = getOutputBytes(fsStats);

out.close(context);

long bytesOutCurr = getOutputBytes(fsStats);

fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);

}

}

private long getOutputBytes(Statistics stats) {

return stats == null ? 0 : stats.getBytesWritten();

}

}

另外还有一些以runOldMapper()为主导的旧MapReduce API那套,就不进行讨论了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: