MapReduce源码分析之JobSplitWriter
2016-06-03 15:09
323 查看
JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。它有两个静态成员变量,如下:
[java] view plain copy
// 分片版本,当前默认为1
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
// 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL"
private static final byte[] SPLIT_FILE_HEADER;
并且,提供了一个静态方法,完成SPLIT_FILE_HEADER的初始化,代码如下:
[java] view plain copy
// 静态方法,加载SPLIT_FILE_HEADER为UTF-8格式的字符串"SPL"的字节数组byte[]
static {
try {
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
} catch (UnsupportedEncodingException u) {
throw new RuntimeException(u);
}
}
JobSplitWriter实现其功能的为createSplitFiles()方法,它有三种实现,我们先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代码如下:
[java] view plain copy
// 创建分片文件
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
// 调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,
// 对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
// 调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
// 关闭输出流
out.close();
// 调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
createSplitFiles()方法的逻辑很清晰,大体如下:
1、调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID;
2、调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info;
3、关闭输出流out;
4、调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件。
我们先来看下createFile()方法,代码如下:
[java] view plain copy
private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
Configuration job) throws IOException {
// 调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,
// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
FSDataOutputStream out = FileSystem.create(fs, splitFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
// 获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10
int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
// 通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10
fs.setReplication(splitFile, (short)replication);
// 调用writeSplitHeader()方法写入分片头信息
writeSplitHeader(out);
// 返回文件系统数据输出流out
return out;
}
首先,调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
其次,获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10;
接着,通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10;
然后,调用writeSplitHeader()方法写入分片头信息;
最后,返回文件系统数据输出流out。
writeSplitHeader()方法专门用于将分片头部信息写入分片文件,代码如下:
[java] view plain copy
private static void writeSplitHeader(FSDataOutputStream out)
throws IOException {
// 文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL"
out.write(SPLIT_FILE_HEADER);
// 文件系统数据输出流out写入int,分片版本号,目前为1
out.writeInt(splitVersion);
}
很简单,首先文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL",然后文件系统数据输出流out写入int,分片版本号,目前为1。
接下来,我们再看下writeNewSplits()方法,它将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info,代码如下:
[java] view plain copy
@SuppressWarnings("unchecked")
private static <T extends InputSplit>
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
// 根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,
// array其实是传入的分片数组
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {// 如果array中有数据
// 创建序列化工厂SerializationFactory实例factory
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
// 获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
// 通过输出流out的getPos()方法获取输出流out的当前位置offset
long offset = out.getPos();
// 遍历数组array中每个元素split
for(T split: array) {
// 通过输出流out的getPos()方法获取输出流out的当前位置prevCount
long prevCount = out.getPos();
// 往输出流out中写入String,内容为split对应的类名
Text.writeString(out, split.getClass().getName());
// 获取序列化器Serializer实例serializer
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
// 打开serializer,接入输出流out
serializer.open(out);
// 将split序列化到输出流out
serializer.serialize(split);
// 通过输出流out的getPos()方法获取输出流out的当前位置currCount
long currCount = out.getPos();
// 通过split的getLocations()方法,获取位置信息locations
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
// 构造split对应的元数据信息,并加入info指定位置,
// offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
// offset增加当前split已写入数据大小
offset += currCount - prevCount;
}
}
// 返回分片元数据信息SplitMetaInfo数组info
return info;
}
writeNewSplits()方法的逻辑比较清晰,大体如下:
1、根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,array其实是传入的分片数组;
2、如果array中有数据:
2.1、创建序列化工厂SerializationFactory实例factory;
2.2、获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10;
2.3、通过输出流out的getPos()方法获取输出流out的当前位置offset;
2.4、遍历数组array中每个元素split:
2.4.1、通过输出流out的getPos()方法获取输出流out的当前位置prevCount;
2.4.2、往输出流out中写入String,内容为split对应的类名;
2.4.3、获取序列化器Serializer实例serializer;
2.4.4、打开serializer,接入输出流out;
2.4.5、将split序列化到输出流out;
2.4.6、通过输出流out的getPos()方法获取输出流out的当前位置currCount;
2.4.7、通过split的getLocations()方法,获取位置信息locations;
2.4.8、确保位置信息locations的长度不能超过maxBlockLocations,超过则截断;
2.4.9、构造split对应的元数据信息,并加入info指定位置,offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations;
2.4.10、offset增加当前split已写入数据大小;
3、返回分片元数据信息SplitMetaInfo数组info。
其中,序列化split对象时,我们以FileSplit为例来分析,其write()方法如下:
[java] view plain copy
@Override
public void write(DataOutput out) throws IOException {
// 写入文件路径全名
Text.writeString(out, file.toString());
// 写入分片在文件中的起始位置
out.writeLong(start);
// 写入分片在文件中的长度
out.writeLong(length);
}
比较简单,分别写入文件路径全名、分片在文件中的起始位置、分片在文件中的长度三个信息。
综上所述,分片文件job.split文件的内容为:
1、文件头:"SPL"+int类型版本号1;
2、分片类信息:String类型split对应类名;
3、分片数据信息:String类型文件路径全名+Long类型分片在文件中的起始位置+Long类型分片在文件中的长度。
而在最后,构造分片元数据信息时,产生的是JobSplit的静态内部类SplitMetaInfo对象,包括分片位置信息locations、split在split文件中的起始位置offset、分片长度split.getLength()。
下面,我们再看下分片的元数据信息文件是如何产生的,让我们来研究下writeJobSplitMetaInfo()方法,代码如下:
[java] view plain copy
// 写入作业分片元数据信息
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
FsPermission p, int splitMetaInfoVersion,
JobSplit.SplitMetaInfo[] allSplitMetaInfo)
throws IOException {
// write the splits meta-info to a file for the job tracker
// 调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,
// 对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
FSDataOutputStream out =
FileSystem.create(fs, filename, p);
// 写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[]
out.write(JobSplit.META_SPLIT_FILE_HEADER);
// 写入分片元数据版本号splitMetaInfoVersion,当前为1
WritableUtils.writeVInt(out, splitMetaInfoVersion);
// 写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length
WritableUtils.writeVInt(out, allSplitMetaInfo.length);
// 遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
splitMetaInfo.write(out);
}
// 关闭输出流out
out.close();
}
writeJobSplitMetaInfo()方法的主体逻辑也十分清晰,大体如下:
1、调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
2、写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[];
3、写入分片元数据版本号splitMetaInfoVersion,当前为1;
4、写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length;
5、遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流;
6、关闭输出流out。
我们看下如何序列化JobSplit.SplitMetaInfo,将其写入文件,JobSplit.SplitMetaInfo的write()如下:
[java] view plain copy
public void write(DataOutput out) throws IOException {
// 将分片位置个数写入分片元数据信息文件
WritableUtils.writeVInt(out, locations.length);
// 遍历位置信息,写入分片元数据信息文件
for (int i = 0; i < locations.length; i++) {
Text.writeString(out, locations[i]);
}
// 写入分片元数据信息的起始位置
WritableUtils.writeVLong(out, startOffset);
// 写入分片大小
WritableUtils.writeVLong(out, inputDataLength);
}
每个分片的元数据信息,包括分片位置个数、分片文件位置、分片元数据信息的起始位置、分片大小等内容。
总结
JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。分片数据文件job.split存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息,而分片元数据信息文件job.splitmetainfo存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小等信息。
job.split文件内容:文件头 + 分片 + 分片 + ... + 分片
文件头:"SPL" + 版本号1
分片:分片类 + 分片数据,分片类=String类型split对应类名,分片数据=String类型HDFS文件路径全名+Long类型分片在HDFS文件中的起始位置+Long类型分片在HDFS文件中的长度
job.splitmetainfo文件内容:文件头 + 分片元数据个数 + 分片元数据 + 分片元数据 + ... + 分片元数据
文件头:"META-SPL" + 版本号1
分片元数据个数:分片元数据的个数
分片元数据:分片位置个数+分片位置+在分片文件job.split中的起始位置+分片大小
[java] view plain copy
// 分片版本,当前默认为1
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
// 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL"
private static final byte[] SPLIT_FILE_HEADER;
并且,提供了一个静态方法,完成SPLIT_FILE_HEADER的初始化,代码如下:
[java] view plain copy
// 静态方法,加载SPLIT_FILE_HEADER为UTF-8格式的字符串"SPL"的字节数组byte[]
static {
try {
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
} catch (UnsupportedEncodingException u) {
throw new RuntimeException(u);
}
}
JobSplitWriter实现其功能的为createSplitFiles()方法,它有三种实现,我们先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代码如下:
[java] view plain copy
// 创建分片文件
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
// 调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,
// 对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
// 调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
// 关闭输出流
out.close();
// 调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
createSplitFiles()方法的逻辑很清晰,大体如下:
1、调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID;
2、调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info;
3、关闭输出流out;
4、调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件。
我们先来看下createFile()方法,代码如下:
[java] view plain copy
private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
Configuration job) throws IOException {
// 调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,
// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
FSDataOutputStream out = FileSystem.create(fs, splitFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
// 获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10
int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
// 通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10
fs.setReplication(splitFile, (short)replication);
// 调用writeSplitHeader()方法写入分片头信息
writeSplitHeader(out);
// 返回文件系统数据输出流out
return out;
}
首先,调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
其次,获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10;
接着,通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10;
然后,调用writeSplitHeader()方法写入分片头信息;
最后,返回文件系统数据输出流out。
writeSplitHeader()方法专门用于将分片头部信息写入分片文件,代码如下:
[java] view plain copy
private static void writeSplitHeader(FSDataOutputStream out)
throws IOException {
// 文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL"
out.write(SPLIT_FILE_HEADER);
// 文件系统数据输出流out写入int,分片版本号,目前为1
out.writeInt(splitVersion);
}
很简单,首先文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL",然后文件系统数据输出流out写入int,分片版本号,目前为1。
接下来,我们再看下writeNewSplits()方法,它将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info,代码如下:
[java] view plain copy
@SuppressWarnings("unchecked")
private static <T extends InputSplit>
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
// 根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,
// array其实是传入的分片数组
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {// 如果array中有数据
// 创建序列化工厂SerializationFactory实例factory
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
// 获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
// 通过输出流out的getPos()方法获取输出流out的当前位置offset
long offset = out.getPos();
// 遍历数组array中每个元素split
for(T split: array) {
// 通过输出流out的getPos()方法获取输出流out的当前位置prevCount
long prevCount = out.getPos();
// 往输出流out中写入String,内容为split对应的类名
Text.writeString(out, split.getClass().getName());
// 获取序列化器Serializer实例serializer
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
// 打开serializer,接入输出流out
serializer.open(out);
// 将split序列化到输出流out
serializer.serialize(split);
// 通过输出流out的getPos()方法获取输出流out的当前位置currCount
long currCount = out.getPos();
// 通过split的getLocations()方法,获取位置信息locations
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
// 构造split对应的元数据信息,并加入info指定位置,
// offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
// offset增加当前split已写入数据大小
offset += currCount - prevCount;
}
}
// 返回分片元数据信息SplitMetaInfo数组info
return info;
}
writeNewSplits()方法的逻辑比较清晰,大体如下:
1、根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,array其实是传入的分片数组;
2、如果array中有数据:
2.1、创建序列化工厂SerializationFactory实例factory;
2.2、获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10;
2.3、通过输出流out的getPos()方法获取输出流out的当前位置offset;
2.4、遍历数组array中每个元素split:
2.4.1、通过输出流out的getPos()方法获取输出流out的当前位置prevCount;
2.4.2、往输出流out中写入String,内容为split对应的类名;
2.4.3、获取序列化器Serializer实例serializer;
2.4.4、打开serializer,接入输出流out;
2.4.5、将split序列化到输出流out;
2.4.6、通过输出流out的getPos()方法获取输出流out的当前位置currCount;
2.4.7、通过split的getLocations()方法,获取位置信息locations;
2.4.8、确保位置信息locations的长度不能超过maxBlockLocations,超过则截断;
2.4.9、构造split对应的元数据信息,并加入info指定位置,offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations;
2.4.10、offset增加当前split已写入数据大小;
3、返回分片元数据信息SplitMetaInfo数组info。
其中,序列化split对象时,我们以FileSplit为例来分析,其write()方法如下:
[java] view plain copy
@Override
public void write(DataOutput out) throws IOException {
// 写入文件路径全名
Text.writeString(out, file.toString());
// 写入分片在文件中的起始位置
out.writeLong(start);
// 写入分片在文件中的长度
out.writeLong(length);
}
比较简单,分别写入文件路径全名、分片在文件中的起始位置、分片在文件中的长度三个信息。
综上所述,分片文件job.split文件的内容为:
1、文件头:"SPL"+int类型版本号1;
2、分片类信息:String类型split对应类名;
3、分片数据信息:String类型文件路径全名+Long类型分片在文件中的起始位置+Long类型分片在文件中的长度。
而在最后,构造分片元数据信息时,产生的是JobSplit的静态内部类SplitMetaInfo对象,包括分片位置信息locations、split在split文件中的起始位置offset、分片长度split.getLength()。
下面,我们再看下分片的元数据信息文件是如何产生的,让我们来研究下writeJobSplitMetaInfo()方法,代码如下:
[java] view plain copy
// 写入作业分片元数据信息
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
FsPermission p, int splitMetaInfoVersion,
JobSplit.SplitMetaInfo[] allSplitMetaInfo)
throws IOException {
// write the splits meta-info to a file for the job tracker
// 调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,
// 对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
FSDataOutputStream out =
FileSystem.create(fs, filename, p);
// 写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[]
out.write(JobSplit.META_SPLIT_FILE_HEADER);
// 写入分片元数据版本号splitMetaInfoVersion,当前为1
WritableUtils.writeVInt(out, splitMetaInfoVersion);
// 写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length
WritableUtils.writeVInt(out, allSplitMetaInfo.length);
// 遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
splitMetaInfo.write(out);
}
// 关闭输出流out
out.close();
}
writeJobSplitMetaInfo()方法的主体逻辑也十分清晰,大体如下:
1、调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
2、写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[];
3、写入分片元数据版本号splitMetaInfoVersion,当前为1;
4、写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length;
5、遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流;
6、关闭输出流out。
我们看下如何序列化JobSplit.SplitMetaInfo,将其写入文件,JobSplit.SplitMetaInfo的write()如下:
[java] view plain copy
public void write(DataOutput out) throws IOException {
// 将分片位置个数写入分片元数据信息文件
WritableUtils.writeVInt(out, locations.length);
// 遍历位置信息,写入分片元数据信息文件
for (int i = 0; i < locations.length; i++) {
Text.writeString(out, locations[i]);
}
// 写入分片元数据信息的起始位置
WritableUtils.writeVLong(out, startOffset);
// 写入分片大小
WritableUtils.writeVLong(out, inputDataLength);
}
每个分片的元数据信息,包括分片位置个数、分片文件位置、分片元数据信息的起始位置、分片大小等内容。
总结
JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。分片数据文件job.split存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息,而分片元数据信息文件job.splitmetainfo存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小等信息。
job.split文件内容:文件头 + 分片 + 分片 + ... + 分片
文件头:"SPL" + 版本号1
分片:分片类 + 分片数据,分片类=String类型split对应类名,分片数据=String类型HDFS文件路径全名+Long类型分片在HDFS文件中的起始位置+Long类型分片在HDFS文件中的长度
job.splitmetainfo文件内容:文件头 + 分片元数据个数 + 分片元数据 + 分片元数据 + ... + 分片元数据
文件头:"META-SPL" + 版本号1
分片元数据个数:分片元数据的个数
分片元数据:分片位置个数+分片位置+在分片文件job.split中的起始位置+分片大小
相关文章推荐
- nginx配置https
- [python] python扫描网段IP
- SSH框架网上商城项目第16战之Hibernate二级缓存处理首页热门显示
- Yii 2 初体验
- 在CentOS 6+7 上安装配置GlusterFS
- Windows下Maven安装教程
- 自定义View,有这一篇就够了
- MapReduce源码分析之LocatedFileStatusFetcher
- 浅谈Java泛型中的extends和super关键字
- Android listView每个item设置不同
- textarea 自动换行缩进 分享
- poj 2112 Optimal Milking(floyd+二分+最大流)
- 用PC搭建SVN服务器:[1]局域网SVN服务器
- 常用Docker命令
- WSTMall微信支付配置
- Excel数据导出,Java自定义注解
- js修改iframe框架的src路径及js获取URL的get参数中文乱码的问题
- HDU 1280 前m大的数(简单HASH)
- Hadoop2.6.0版本MapReudce示例之WordCount(二)
- CSS3制作漂亮的照片墙