您的位置:首页 > 运维架构

Hadoop学习总结:Map-Reduce的过程解析

2015-04-07 13:17 302 查看

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
public static RunningJob runJob(JobConf job) throws IOException {
//首先生成一个JobClient对象
JobClient jc = new JobClient(job);
……
//调用submitJob来提交一个任务
running = jc.submitJob(job);
JobID jobId = running.getID();
……
while (true) {
//while循环中不断得到此任务的状态,并打印到客户端console中
}
return running;
}
其中JobClient的submitJob函数实现如下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException,
InvalidJobConfException, IOException {
//从JobTracker得到当前任务的id
JobID jobId = jobSubmitClient.getNewJobId();
//准备将任务运行所需要的要素写入HDFS:
//任务运行程序所在的jar封装成job.jar
//任务所要处理的input split信息写入job.split
//任务运行的配置项汇总写入job.xml
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
//此处将-libjars命令行指定的jar上传至HDFS
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
……
//通过input format的格式获得相应的input split,默认类型为FileSplit
InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());

// 生成一个写入流,将input split得信息写入job.split文件
FSDataOutputStream out = FileSystem.create(fs,
submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
try {
//写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。
//对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。
writeSplitsFile(splits, out);
} finally {
out.close();
}
job.set("mapred.job.split.file", submitSplitFile.toString());
//根据split的个数设定map task的个数
job.setNumMapTasks(splits.length);
// 写入job的配置信息入job.xml文件
out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));
try {
job.writeXml(out);
} finally {
out.close();
}
//真正的调用JobTracker来提交任务
JobStatus status = jobSubmitClient.submitJob(jobId);
……
}

二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

调用静态函数startTracker(new JobConf())创建一个JobTracker对象
调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
//对JobTracker的每一个listener都调用jobAdded函数
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);
}
}
}
EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
public synchronized void initTasks() throws IOException {
……
//从HDFS中读取job.split文件从而生成input splits
String jobFile = profile.getJobFile();
Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
//map task的个数就是input split的个数
numMapTasks = splits.length;
//为每个map tasks生成一个TaskInProgress来处理一个input split
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
//对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
if (numMapTasks > 0) {

nonRunningMapCache = createCache(splits, maxLevel);

}

//创建reduce task
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
//reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
nonRunningReduces.add(reduces[i]);
}

//创建两个cleanup task,一个用来清理map,一个用来清理reduce.
cleanup = new TaskInProgress[2];
cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
//创建两个初始化 task,一个初始化map,一个初始化reduce.
setup = new TaskInProgress[2];
setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
tasksInited.set(true);//初始化完毕
……
}

三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
State offerService() throws Exception {
long lastHeartbeat = 0;
//TaskTracker进行是一直存在的
while (running && !shuttingDown) {
……
long now = System.currentTimeMillis();
//每隔一段时间就向JobTracker发送heartbeat
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
synchronized(finishedCount) {
if (finishedCount[0] == 0) {
finishedCount.wait(waitTime);
}
finishedCount[0] = 0;
}
}
……
//发送Heartbeat到JobTracker,得到response
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
……
//从Response中得到此TaskTracker需要做的事情
TaskTrackerAction[] actions = heartbeatResponse.getActions();
……
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
//如果是运行一个新的Task,则将Action添加到任务队列中
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
commitResponses.add(commitAction.getTaskID());
}
} else {
tasksToCleanup.put(action);
}
}
}
}
return State.NORMAL;
}
其中transmitHeartBeat主要逻辑如下:
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
//每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
……
//报告给JobTracker,此TaskTracker的当前状态
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxCurrentMapTasks,
maxCurrentReduceTasks);
}
}
……
//当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
//当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
//当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks() < maxCurrentReduceTasks) &&
acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}
……
//向JobTracker发送heartbeat,这是一个RPC调用
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
……
return heartbeatResponse;
}

四、JobTracker

当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
……
String trackerName = status.getTrackerName();
……
short newResponseId = (short)(responseId + 1);
……
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
//如果TaskTracker向JobTracker请求一个task运行
if (acceptNewTasks) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
//setup和cleanup的task优先级最高
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
//任务调度器分配任务
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
//将任务放入actions列表,返回给TaskTracker
expireLaunchingTasks.addNewTask(task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}
……
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
……
return response;
}
默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
//计算剩余的map和reduce的工作量:remaining
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
int totalMapTasks = job.desiredMaps();
int totalReduceTasks = job.desiredReduces();
remainingMapLoad += (totalMapTasks - job.finishedMaps());
remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
}
}
}
//计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。
int maxMapLoad = 0;
int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(maxCurrentMapTasks,
(int) Math.ceil((double) remainingMapLoad /
numTaskTrackers));
maxReduceLoad = Math.min(maxCurrentReduceTasks,
(int) Math.ceil((double) remainingReduceLoad
/ numTaskTrackers));
}
……

//map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
if (numMaps < maxMapLoad) {
int totalNeededMaps = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
//分配完map task,再分配reduce task
if (numReduces < maxReduceLoad) {
int totalNeededReduces = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
return null;
}
从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

五、TaskTracker

在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}
TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
private void localizeJob(TaskInProgress tip) throws IOException {
//首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar
Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
……
Path localJobFile = lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if (!rjob.localized) {
FileSystem localFs = FileSystem.getLocal(fConf);
Path jobDir = localJobFile.getParent();
……
//将job.split拷贝到本地
systemFS.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
Path workDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());
// copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
long jarFileSize = -1;
if (jarFile != null) {
Path jarFilePath = new Path(jarFile);
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory ");
}
//将job.jar拷贝到本地
systemFS.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
//将job得configuration写成job.xml
OutputStream out = localFs.create(localJobFile);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
// 解压缩job.jar
RunJar.unJar(new File(localJarFile.toString()),
new File(localJarFile.getParent().toString()));
}
rjob.localized = true;
rjob.jobConf = localJobConf;
}
}
//真正的启动此Task
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:
public synchronized void launchTask() throws IOException {
……
//创建task运行目录
localizeTask(task);
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
//创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
}
TaskRunner是一个线程,其run函数如下:
public final void run() {
……
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File jobCacheDir = null;
if (conf.getJar() != null) {
jobCacheDir = new File(
new Path(conf.getJar()).getParent().toString());
}
File workDir = new File(lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalTaskDir(
t.getJobID().toString(),
t.getTaskID().toString(),
t.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf). toString());
FileSystem fileSystem;
Path localPath;
……
//拼写classpath
String baseDir;
String sep = System.getProperty("path.separator");
StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
if (!workDir.mkdirs()) {
if (!workDir.isDirectory()) {
LOG.fatal("Mkdirs failed to create " + workDir.toString());
}
}
String jar = conf.getJar();
if (jar != null) {
// if jar exists, it into workDir
File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.append(sep); // add libs from jar to classpath
classPath.append(libs[i]);
}
}
classPath.append(sep);
classPath.append(new File(jobCacheDir, "classes"));
classPath.append(sep);
classPath.append(jobCacheDir);
}
……
classPath.append(sep);
classPath.append(workDir);
//拼写命令行java及其参数
Vector<String> vargs = new Vector<String>(8);
File jvm =
new File(new File(System.getProperty("java.home"), "bin"), "java");
vargs.add(jvm.toString());
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", taskid.toString());
String [] javaOptsSplit = javaOpts.split(" ");
String libraryPath = System.getProperty("java.library.path");
if (libraryPath == null) {
libraryPath = workDir.getAbsolutePath();
} else {
libraryPath += sep + workDir;
}
boolean hasUserLDPath = false;
for(int i=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
javaOptsSplit[i] += sep + libraryPath;
hasUserLDPath = true;
break;
}
}
if(!hasUserLDPath) {
vargs.add("-Djava.library.path=" + libraryPath);
}
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
//添加Child进程的临时文件夹
String tmp = conf.get("mapred.child.tmp", "./tmp");
Path tmpDir = new Path(tmp);
if (!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
}
FileSystem localFs = FileSystem.getLocal(conf);
if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
// Add classpath.
vargs.add("-classpath");
vargs.add(classPath.toString());
//log文件夹
long logSize = TaskLog.getTaskLogLength(conf);
vargs.add("-Dhadoop.log.dir=" +
new File(System.getProperty("hadoop.log.dir")
).getAbsolutePath());
vargs.add("-Dhadoop.root.logger=INFO,TLA");
vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
// 运行map task和reduce task的子进程的main class是Child
vargs.add(Child.class.getName()); // main of Child
……
//运行子进程
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
workDir, env, pidFile, conf));
}

六、Child

真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
while (true) {
//从TaskTracker通过网络通信得到JvmTask对象
JvmTask myTask = umbilical.getTask(jvmId);
……
idleLoopCount = 0;
task = myTask.getTask();
taskid = task.getTaskID();
isCleanup = task.isTaskCleanupTask();
JobConf job = new JobConf(task.getJobFile());
TaskRunner.setupWorkDir(job);
numTasksToExecute = job.getNumTasksToExecutePerJvm();
task.setConf(job);
defaultConf.addResource(new Path(task.getJobFile()));
……
//运行task
task.run(job, umbilical); // run the task
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
}
}

6.1、MapTask

如果task是MapTask,则其run函数如下:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
//用于同TaskTracker进行通信,汇报运行状况
final Reporter reporter = getReporter(umbilical);
startCommunicationThread(umbilical);
initialize(job, reporter);
……
//map task的输出
int numReduceTasks = conf.getNumReduceTasks();
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
//读取input split,按照其中的信息,生成RecordReader来读取数据
instantiatedSplit = (InputSplit)
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
instantiatedSplit.readFields(splitBuffer);
if (instantiatedSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) instantiatedSplit;
job.set("map.input.file", fileSplit.getPath().toString());
job.setLong("map.input.start", fileSplit.getStart());
job.setLong("map.input.length", fileSplit.getLength());
}
RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
RecordReader in = isSkipping() ?
new SkippingRecordReader(rawIn, getCounters(), umbilical) :
new TrackedRecordReader(rawIn, getCounters());
job.setBoolean("mapred.skip.on", isSkipping());
//对于map task,生成一个MapRunnable,默认是MapRunner
MapRunnable runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
//MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
runner.run(in, collector, reporter);
collector.flush();
} finally {
in.close(); // close input
collector.close();
}
done(umbilical);
}
MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} finally {
mapper.close();
}
}
结果集全部收集到MapOutputBuffer中,其collect函数如下:
public synchronized void collect(K key, V value)
throws IOException {
reporter.progress();
……
//从此处看,此buffer是一个ring的数据结构
final int kvnext = (kvindex + 1) % kvoffsets.length;
spillLock.lock();
try {
boolean kvfull;
do {
//在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
kvfull = kvnext == kvstart;
//在ring中计算是否需要将buffer写入硬盘的阈值
final boolean kvsoftlimit = ((kvnext > kvend)
? kvnext - kvend > softRecordLimit
: kvend - kvnext <= kvoffsets.length - softRecordLimit);
//如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
//startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘
if (kvstart == kvend && kvsoftlimit) {
startSpill();
}
//如果buffer满了,则只能等待写入完毕
if (kvfull) {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
}
} while (kvfull);
} finally {
spillLock.unlock();
}
try {
//如果buffer不满,则将key, value写入buffer
int keystart = bufindex;
keySerializer.serialize(key);
final int valstart = bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
//调用设定的partitioner,根据key, value取得partition id
final int partition = partitioner.getPartition(key, value, partitions);
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend >= keystart
? valend - keystart
: (bufvoid - keystart) + valend);
//将parition id以及key, value在buffer中的偏移量写入索引数组
int ind = kvindex * ACCTSIZE;
kvoffsets[kvindex] = ind;
kvindices[ind + PARTITION] = partition;
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value);
mapOutputRecordCounter.increment(1);
return;
}
}
内存buffer的格式如下:
(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)



kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
private void sortAndSpill() throws IOException {
……
FSDataOutputStream out = null;
FSDataOutputStream indexOut = null;
IFileOutputStream indexChecksumOut = null;
//创建硬盘上的spill文件
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
……
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
//按照partition的顺序对buffer中的数据进行排序
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
InMemValBytes value = new InMemValBytes();
//依次一个一个parition的写入文件
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
//如果combiner为空,则直接写入文件
if (null == combinerClass) {
……
writer.append(key, value);
++spindex;
}
else {
……
//如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
combineAndSpill(kvIter, combineInputCounter);
}
}
……
}
当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
private void mergeParts() throws IOException {
……
//对于每一个partition
for (int parts = 0; parts < partitions; parts++){
//create the segments to be merged
List<Segment<K, V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
TaskAttemptID mapId = getTaskID();
//依次从各个spill文件中收集属于当前partition的段
for(int i = 0; i < numSpills; i++) {
final IndexRecord indexRecord =
getIndexInformation(mapId, i, parts);
long segmentOffset = indexRecord.startOffset;
long segmentLength = indexRecord.partLength;
Segment<K, V> s =
new Segment<K, V>(job, rfs, filename[i], segmentOffset,
segmentLength, codec, true);
segmentList.add(i, s);
}
//将属于同一个partition的段merge到一起
RawKeyValueIterator kvIter =
Merger.merge(job, rfs,
keyClass, valClass,
segmentList, job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()),
job.getOutputKeyComparator(), reporter);
//写入合并后的段到文件
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
if (null == combinerClass || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combineAndSpill(kvIter, combineInputCounter);
}
……
}
}

6.2、ReduceTask

ReduceTask的run函数如下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
job.setBoolean("mapred.skip.on", isSkipping());
//对于reduce,则包含三个步骤:拷贝,排序,Reduce
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
startCommunicationThread(umbilical);
final Reporter reporter = getReporter(umbilical);
initialize(job, reporter);
//copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
……
}
}
copyPhase.complete();
//sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter)
: reduceCopier.createKVIterator(job, rfs, reporter);
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//reduce阶段
setPhase(TaskStatus.Phase.REDUCE);
……
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
ReduceValuesIterator values = isSkipping() ?
new SkippingReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter, umbilical) :
new ReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter);
//逐个读出key-value list,然后调用Reducer的reduce函数
while (values.more()) {
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
values.informReduceProgress();
}
reducer.close();
out.close(reporter);
done(umbilical);
}

七、总结

Map-Reduce的过程总结如下图:




我们知道,Mapper是通过OutputCollector将Map的结果输出,输出的量很大,Hadoop的机制是通过一个circle buffer 收集Mapper的输出, 到了io.sort.mb * percent量的时候,就spill到disk,如下图。图中出现了两个数组和一个缓冲区,kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。



当Mapper任务结束后,有可能会出现多个spill文件,这些文件会做一个归并排序,形成Mapper的一个输出(spill.out和spill.out.index),如下图:



这个输出是按partition排序的,这样的话,Mapper的输出被分段,Reducer要获取的就是spill.out中的一段。(注意,内存和硬盘上的索引结构不一样)

---------------------------------------------------------------------------------------------------------------------------------------------------

辅助类2:有了上面Mapper输出的内存存储结构和硬盘存储结构讨论,我们来仔细分析MapOutputBuffer的流程。

首先是成员变量。最先初始化的是作业配置job和统计功能reporter。通过配置,MapOutputBuffer可以获取本地文件系统(localFs和rfs),Reducer的数目和Partitioner。

SpillRecord是文件spill.out{spill号}.index在内存中的对应抽象(内存数据和文件数据就差最后的校验和),该文件保持了一系列的IndexRecord,如下图:



IndexRecord有3个字段,分别是startOffset:记录偏移量,rawLength:初始长度,partLength:实际长度(可能有压缩)。SpillRecord保持了一系列的IndexRecord,并提供方法用于添加记录(没有删除记录的操作,因为不需要),获取记录,写文件,读文件(通过构造函数)。

接下来是一些和输出缓存区kvbuffer,缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets相关的处理,下面的图有助于说明这段代码。



这部分依赖于3个配置参数,io.sort.spill.percent是kvbuffer,kvindices和kvoffsets的总大小(以M为单位,缺省是100,就是100M,这一部分是MapOutputBuffer中占用存储最多的)。io.sort.record.percent是kvindices和kvoffsets占用的空间比例(缺省是0.05)。前面的分析我们已经知道kvindices和kvoffsets,如果记录数是N的话,它占用的空间是4N*4bytes,根据这个关系和io.sort.record.percent的值,我们可以计算出kvindices和kvoffsets最多能有多少个记录,并分配相应的空间。参数io.sort.spill.percent指示当输出缓冲区或kvindices和kvoffsets记录数量到达对应的占用率的时候,会启动spill,将内存缓冲区的记录存放到硬盘上,softBufferLimit和softRecordLimit为对应的字节数。

值对<key, value>输出到缓冲区是通过Serializer串行化的,这部分的初始化跟在上面输出缓存后面。接下来是一些计数器和可能的数据压缩处理器的初始化,可能的Combiner和combiner工作的一些配置。

最后是启动spillThread,该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。

先看生产者,MapOutputBuffer.collect的主要流程是:

报告进度和参数检测(<K, V>符合Mapper的输出约定);

spillLock.lock(),进入临界区;

如果达到spill条件,设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待);

spillLock.unlock();

输出key,value并更新kvindices和kvoffsets(注意,方法collect是synchronized,key和value各自输出,它们也会占用连续的输出缓冲区);

kvstart,kvend和kvindex三个变量在判断是否需要spill和spill是否结束的过程中很重要,kvstart是有效记录开始的下标,kvindex是下一个可做记录的位置,kvend的作用比较特殊,它在一般情况下kvstart==kvend,但开始spill的时候它会被赋值为kvindex的值,spill结束时,它的值会被赋给kvstart,这时候kvstart==kvend。这就是说,如果kvstart不等于kvend,系统正在spill,否则,kvstart==kvend,系统处于普通工作状态。其实在代码中,我们可以看到很多kvstart==kvend的判断。

下面我们分情况,讨论kvstart,kvend和kvindex的配合。初始化的时候,它们都被赋值0。



下图给出了一个没有spill的记录添加过程:



注意kvindex和kvnext的关系,取模实现了循环缓冲区

如果在添加记录的过程中,出现spill(多种条件),那么,主要的过程如下:



首先还是计算kvnext,主要,这个时候kvend==kvstart(图中没有画出来)。如果spill条件满足,那么,kvindex的值会赋给kvend(这是kvend不等于kvstart),从kvstart和kvend的大小关系,我们可以知道记录位于数组的那一部分(左边是kvstart<kvend的情况,右边是另外的情况)。Spill结束的时候,kvend值会被赋给kvstart,
kvend==kvstart又重新满足,同时,我们可以发现kvindex在这个过程中没有变化,新的记录还是写在kvindex指向的位置,然后,kvindex=kvnect,kvindex移到下一个可用位置。

大家体会一下上面的过程,特别是kvstart,kvend和kvindex的配合,其实,<key,value>对输出使用的缓冲区,也有类似的过程。

Collect在处理<key,value>输出时,会处理一个MapBufferTooSmallException,这是value的串行化结果太大,不能一次放入缓冲区的指示,这种情况下我们需要调用spillSingleRecord,特殊处理。

---------------------------------------------------------------------------------------------------------------------------------------------------

辅助类3:接下来讨论的是key,value的输出,这部分比较复杂,不过有了前面kvstart,kvend和kvindex配合的分析,有利于我们理解这部分的代码。输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾(见后面BlockingBuffer.reset分析),和变量bufmark,用于标记记录的结尾。这部分代码需要bufmark,是因为key或value的输出是变长的,(前面元信息记录大小是常量,就不需要这样的变量)。最好的情况是缓冲区没有翻转和value串行化结果很小,如下图:



先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。

串行化过程中,往缓冲区写是最终调用了Buffer.write方法,我们后面再分析。

如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法,解决这个问题。下图解释了如何解决这个问题:



当发现key的串行化结果出现不连续的情况时,我们会把bufvoid设置为bufmark,见缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。

上面的调整有一个条件,就是bufstart前面的缓冲区能够放下整个key串行化的结果,如果不能,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终我们会成功写入key串行化的结果。

下面我们看write方法。key,value串行化过程中,往缓冲区写数据是最终调用了Buffer.write方法,又是一个复杂的方法。

do-while循环,直到我们有足够的空间可以写数据(包括缓冲区和kvindices和kvoffsets)

首先我们计算缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap(这个实在拗口),见下面的讨论;条件(buffull && !wrap)用于判断目前有没有足够的写空间;

在spill没启动的情况下(kvstart == kvend),分两种情况,如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程;否则,如果空间还是不够(buffull && !wrap),表明这个记录非常大,以至于我们的内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常;

如果空间不足同时spill在运行,等待spillDone;

写数据,注意,如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。

下图给出了缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap计算的几种可能:



情况1和情况2中,buffull判断为从bufindex到bufvoid是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间是否比输入大,如果是,wrap为true;情况3和情况4中,buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。明显,条件(buffull
&& !wrap)满足时,目前的空间不够一次写。

接下来我们来看spillSingleRecord,只是用于写放不进内存缓冲区的<key,value>对。过程很流水,首先是创建SpillRecord记录,输出文件和IndexRecord记录,然后循环,构造SpillRecord并在恰当的时候输出记录(如下图),最后输出spill{n}.index文件。



前面我们提过spillThread,在这个系统中它是消费者,这个消费者相当简单,需要spill时调用函数sortAndSpill,进行spill。sortAndSpill和spillSingleRecord类似,函数的开始也是创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。

按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。

sortAndSpill最后是输出spill{n}.index文件。

combineAndSpill比价简单,我们就不分析了。

BlockingBuffer中最后要分析的方法是flush方法。调用flush方法,意味着Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,并合并spill{n}文件产生最后的输出。

缓冲区处理部分很简单,先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程。

flush合并spill{n}文件是通过mergeParts方法。如果Mapper最后只有一个spill{n}文件,简单修改该文件的文件名就可以。如果Mapper没有任何输出,那么我们需要创建哑输出(dummy files)。如果spill{n}文件多于1个,那么按partition循环处理所有文件,将处于处理partition的记录输出。处理partition的过程中可能还会再次调用combineAndSpill,最记录再做一次combination,其中还涉及到工具类Merger,我们就不再深入研究了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: