您的位置:首页 > 其它

mapreduce源码分析之Reduce任务的运行

2012-05-02 21:03 309 查看
七.Reduce任务的运行
看看reduceTask里面的run方法:先看前几行代码跟map其实差不多,只是多了一个状态的设置
----------------------------------------------------------------------------

publicvoidrun(JobConfjob,finalTaskUmbilicalProtocolumbilical)
throwsIOException,InterruptedException,ClassNotFoundException{
this.umbilical=umbilical;
job.setBoolean("mapred.skip.on",isSkipping());
//添加reduce过程需要的几个阶段,从而通知taskTracker目前的情况,主要是copy
//sort和reduce几个过程
if(isMapOrReduce()){
copyPhase=getProgress().addPhase("copy");
sortPhase=getProgress().addPhase("sort");
reducePhase=getProgress().addPhase("reduce");
}
//startthreadthatwillhandlecommunicationwithparent
//与map差不多
TaskReporterreporter=newTaskReporter(getProgress(),umbilical);
reporter.startCommunicationThread();
booleanuseNewApi=job.getUseNewReducer();
initialize(job,getJobID(), reporter, useNewApi);

//checkifitisacleanupJobTask
if(jobCleanup){
runJobCleanupTask(umbilical,reporter);
return;
}
if(jobSetup){
runJobSetupTask(umbilical,reporter);
return;
}
if(taskCleanup){
runTaskCleanupTask(umbilical,reporter);
return;
}
。。。
----------------------------------------------------------------------------
其它代码有:
----------------------------------------------------------------------------

codec= initCodec();

booleanisLocal="local".equals(job.get("mapred.job.tracker","local"));
if(!isLocal){
//ReduceCopier对象负责把Map函数的输出拷贝到Reduce所在的机器上
reduceCopier=newReduceCopier(umbilical,job,reporter);
//fetchOutputs负责拷贝各个Map函数的输出
if(!reduceCopier.fetchOutputs()){
if(reduceCopier.mergeThrowableinstanceofFSError){
throw(FSError)reduceCopier.mergeThrowable;
}
thrownewIOException("Task:"+getTaskID()+
"- The reduce copier failed", reduceCopier.mergeThrowable);
}
}
//copy完成
copyPhase.complete(); // copy is already complete
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);

finalFileSystemrfs=FileSystem.getLocal(job).getRaw();
//根据JobTracker是否在本地来调用哪种排序方式
RawKeyValueIteratorrIter = isLocal
?Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),codec,getMapFiles(rfs,true),
!conf.getKeepFailedTaskFiles(),job.getInt("io.sort.factor", 100),
newPath(getTaskID().toString()),job.getOutputKeyComparator(),
reporter,spilledRecordsCounter,null)
:reduceCopier.createKVIterator(job, rfs, reporter);

//free up the data structures
mapOutputFilesOnDisk.clear();

sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
ClasskeyClass = job.getMapOutputKeyClass();
ClassvalueClass = job.getMapOutputValueClass();
RawComparatorcomparator = job.getOutputValueGroupingComparator();

if(useNewApi){
runNewReducer(job,umbilical, reporter, rIter, comparator,
keyClass,valueClass);
}else{
runOldReducer(job,umbilical, reporter, rIter, comparator,
keyClass,valueClass);
}
done(umbilical,reporter);
}
----------------------------------------------------------------------------
最后来看看runNewReducer方法:
----------------------------------------------------------------------------
private<INKEY,INVALUE,OUTKEY,OUTVALUE>
voidrunNewReducer(JobConfjob,
finalTaskUmbilicalProtocolumbilical,
finalTaskReporterreporter,
RawKeyValueIteratorrIter,
RawComparator<INKEY>comparator,
Class<INKEY>keyClass,
Class<INVALUE>valueClass
)throwsIOException,InterruptedException,
ClassNotFoundException{
。。。。
。。。。
//make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext =
neworg.apache.hadoop.mapreduce.TaskAttemptContext(job,getTaskID());
//make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(),job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>trackedRW =
newNewTrackingRecordWriter<OUTKEY,OUTVALUE>(output,reduceOutputCounter);
job.setBoolean("mapred.skip.on",isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext= createReduceContext(reducer, job, getTaskID(),
rIter,reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,committer,
reporter,comparator, keyClass,
valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}

----------------------------------------------------------------------------
Reduce的run方法如下:它是在mapreduce包里面

----------------------------------------------------------------------------

publicvoidrun(Contextcontext)throwsIOException,InterruptedException{
setup(context);
while(context.nextKey()){
reduce(context.getCurrentKey(),context.getValues(), context);
}
cleanup(context);
}
同map。。。
我之前看过mapreduce的源码,到这里的时候就感觉像是打通了那什么什么二脉,^_^!你也可以看看mapreduce里面的源码,我感觉那个条理还是比较清晰,从Job类开始看,你应该也会有这样感觉哦!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: