您的位置:首页 > 运维架构

Hadoop MapReduce之ReduceTask任务执行(一):远程拷贝map输出

2014-06-16 22:25 585 查看
reduce执行流程经历三个阶段:copy、sort、reduce,在第一阶段reduce任务会把map的输出拷贝至本地,通过线程MapOutputCopier,该线程通过http协议将map输出拷贝至本地,该copy操作可以并行进行,默认情况下有5个线程执行此操作,如果map数量较大时可以适当调大此值,拷贝时使用http协议,此时reducetask为client,map端以jetty作为web服务器。reduce任务的执行与map一样在Child类启动,但在TaskFinal.run(job,umbilical)进入ReduceTask类执行。reduce的过程比较复杂,本节只分析copy部分,最后会分析整个reduce流程,需要注意的是每个reduce只拷贝自己需要处理那个partition数据。

拷贝map输出结果代码:ReduceTask.java 1309行

/** Loop forever and fetch map outputs as they become available.
* The thread exits when it is interrupted by {@link ReduceTaskRunner}
*/
@Override
public void run() {
while (true) {
try {
MapOutputLocation loc = null;
long size = -1;
//不停查询调度拷贝的集合,如果有数据到来,则说明需要做copy操作
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
scheduledCopies.wait();
}
//取出第一个输出位置
loc = scheduledCopies.remove(0);
}
CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
readError = false;
try {
shuffleClientMetrics.threadBusy();
start(loc);
//开始读取数据并返回长度,这也是下面代码中要分析的函数
size = copyOutput(loc);
//更新统计信息
shuffleClientMetrics.successFetch();
error = CopyOutputErrorType.NO_ERROR;
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
loc.getTaskAttemptId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
if (readError) {
error = CopyOutputErrorType.READ_ERROR;
}
// Reset
size = -1;
} finally {
shuffleClientMetrics.threadFree();
//当前连接加入copyResults集合
finish(size, error);
}
} catch (InterruptedException e) {
break; // 执行到此步则证明拷贝完成
} catch (FSError e) {//文件系统错误
LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
StringUtils.stringifyException(e));
try {
umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
} catch (IOException io) {
LOG.error("Could not notify TT of FSError: " +
StringUtils.stringifyException(io));
}
} catch (Throwable th) {
String msg = getTaskID() + " : Map output copy failure : "
+ StringUtils.stringifyException(th);
reportFatalError(getTaskID(), th, msg);
}
}
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}


下面分析copyOutput函数:ReduceTask.java 1373行

/** Copies a a map output from a remote host, via HTTP.
* @param currentLocation the map output location to be copied
* @return the path (fully qualified) of the copied file
* @throws IOException if there is an error copying the file
* @throws InterruptedException if the copier should give up
*/
private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// 检查该位置是否还需拷贝
if (copiedMapOutputs.contains(loc.getTaskId()) ||
obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
}

// 内存写满时需要用到的临时文件
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename =
new Path(String.format(
MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
TaskTracker.OUTPUT, loc.getTaskId().getId()));

// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);

// 开始拷贝map的输出
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
reduceId.getTaskID().getId());
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
loc.getHost());
}

// 获得输出尺寸
long bytes = mapOutput.compressedSize;

// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
}

// Special case: discard empty map-outputs
if (bytes == 0) {
try {
mapOutput.discard();
} catch (IOException ioe) {
LOG.info("Couldn't discard output of " + loc.getTaskId());
}

// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());

return bytes;
}

// 判断是否完全在内存中,根据具体情况执行不同分支
if (mapOutput.inMemory) {
// 如果完全在内存中则放入内存文件的集合中
mapOutputsFilesInMemory.add(mapOutput);
} else {
// Rename the temporary file to the final file;
// ensure it is on the same partition
tmpMapOutput = mapOutput.file;
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
}

synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
}

// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
}

return bytes;
}


getMapOutput函数负责拷贝输出的工作,利用URLConnection建立连接,url格式类似:http://PC-20130917RGUY:50060/mapOutput?job=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1
,包含协议类型:http,主机及端口:PC-20130917RGUY:50060,路径名称:mapOutput,查询参数包含作业名、map任务名、reduce编号:ob=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1
url会根据这个地址建立连接,并打开一个输入流读取数据。在开始读取前会判断本次的读取是否能全部放入缓存中,这部分缓存使用是有限制的:jvm_heap_size × mapred.job.shuffle.input.buffer.percent × MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION,其中jvm_heap_size可以通过mapred.job.reduce.total.mem.bytes来设置,如果没设置则通过Runtime.getRuntime().maxMemory()来获取,可以通过mapred.child.opts来影响jvm堆的大小,mapred.job.shuffle.input.buffer.percent可以在参数文件中设置,默认为0.7,MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION在当前版本中为一常量值0.25,也就是说加入我们指定jvm堆大小为1024M,那么一个ReduceTask在拷贝时用到的缓存为1024×0.7×0.25=179M,当我们的map输出大于179M时,则直接写入文件.

ReduceTask.java 1373行

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
// 建立url连接
URL url = mapOutputLoc.getOutputLocation();
URLConnection connection = url.openConnection();

InputStream input = setupSecureConnection(mapOutputLoc, connection);

// 校验任务ID
TaskAttemptID mapId = null;
try {
mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
LOG.warn("Invalid map id ", ia);
return null;
}
TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
}
//判断返回数据长度是否异常
//取得压缩和未压缩长度,后面判断在内存还是硬盘做shuffle
long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
//判断reduce编号是否相同
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));

if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
}

//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
//    the total inmem fs
//2. There is space available in the inmem fs

// 判断拷贝的数据能否完全放入内存中,内存计算公式为: JVM堆尺寸×mapred.job.shuffle.input.buffer.percent(0.7)× 1/4
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);

// Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());
}
//如果可以放入内存,则放入新建立的byte buffer中
mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());
}
//内存中放不下则放入磁盘中
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}

return mapOutput;
}


如果内存足够大,则copy过来的数据直接放入内存中,首先会分配一个byte数组,然后从上面建立的输入流冲取得所需数据。

ReduceTask.java 1646行

private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
int mapOutputLength,
int compressedLength)
throws IOException, InterruptedException {
// 判断是否有足够内存存放数据,如果没有则等待内存刷新,
//刷新内存前会讲输入流置空,所以在这个函数返回为false时需要重新连接,刷新数据时会唤醒内存合并线程
boolean createdNow = ramManager.reserve(mapOutputLength, input);

//是否需要重新连接
if (!createdNow) {
// Reconnect
try {
connection = mapOutputLoc.getOutputLocation().openConnection();
input = setupSecureConnection(mapOutputLoc, connection);
} catch (IOException ioe) {
LOG.info("Failed reopen connection to fetch map-output from " +
mapOutputLoc.getHost());

// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);

throw ioe;
}
}
//包装输入流
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength);

input = checksumIn;

// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
}

// 创建buffer,从输入流读取并填充
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);

int bytesRead = 0;
try {
//循环读取流冲数据至缓存中
int n = input.read(shuffleData, 0, shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);

// indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
}

if (LOG.isDebugEnabled()) {
LOG.debug("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());
}
//数据读取完毕则关闭输入流
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);

.....
}

// 关闭内存文件,并唤醒内存合并线程
ramManager.closeInMemoryFile(mapOutputLength);

// 校验数据读取长度
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength);

// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;

throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}
.....
return mapOutput;
}


如果内存过小不能存放本次读取的数据则直接写入磁盘文件中,我们会在相关目录中看到这个文件如:C:/hadoop/tasklog/taskTracker/hadoop/jobcache/job_201311281345_0001/attempt_201311281345_0001_r_000001_1/output/map_0.out-0

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
long mapOutputLength)
throws IOException {
// 构建本地文件系统文件名
Path localFilename =
lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
mapOutputLength, conf);
//创建基于磁盘文件的MapOutput
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
conf, localFileSys.makeQualified(localFilename),
mapOutputLength);

// 开始数据拷贝
OutputStream output = null;
long bytesRead = 0;
try {
output = rfs.create(localFilename);
//讲map输出直接写入磁盘时分配的缓存,固定64K
byte[] buf = new byte[64 * 1024];
int n = -1;
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n);

// indicate we're making progress
reporter.progress();
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
}

LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());

output.close();
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);

// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;

// Close the streams
IOUtils.cleanup(LOG, input, output);

// Re-throw
throw ioe;
}

// 读取数据后的检测
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
mapOutput = null;

throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}

return mapOutput;
}


阅读笔记:

1. MapOutput类 本质上是一个指向一个数据块的指针,该数据块可以在硬盘上,也可以在内存上。(1)final boolean inMemory表示该数据块是否在内存中 (2)final Path file表示数据在硬盘上的路径 (3)byte[] data表示数据在内存中的数据块

2. ReduceTask.run() 是ReduceTask的起始点。其中分为三部分:(1).Copy阶段(由reduceCopier.fetchOutputs()完成)
(2).Sort阶段(由Merger.merge()完成) (3).Reduce阶段(由runOldReducer()或者runNewReducer()完成)

3. fetchOutputs()函数中启动多个(由mapred.reduce.parallel.copies属性设置,默认为5个)MapOutputCopier线程进行远程数据拷贝到本地。远程拷贝运行过程中,存在 InMemFSMergeThread线程 和 LocalFSMerger线程 进行文件合并。

4. MapOutput类中的 discard()函数即抛弃拷贝的map输出结果。若该MapOutput数据块在内存上,则将数据指针data置null,若数据块在硬盘上,则调用 fs.delete(file,true) 删除该文件。

5. 远程拷贝过程中,每次拷贝一个数据块时,若该数据块可以放入内存则放入内存,否则放入硬盘。有两个标准决定该数据块是否应该放入硬盘:(1) 数据块小于 java_heaps _size * mapred.job.shuffle.input.buffer.percent * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION(0.25) (2) 内存中有足够空间放入该数据块。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: