您的位置:首页 > 大数据

一个MapReuce作业的从开始到结束--第5章 把文件复制到HDFS的流程

2013-11-29 15:15 218 查看
  在本章,我们以《从零开始学习Hadoop》的第一个例子WordCount为例,分析Hadoop执行MapReduce的流程和诸多细节。这个例子在该书有详细的说明,在这里不在给出,请参考该书了解细节。

1. 执行流程

[1].将README.txt文件复制到HDFS的命令是:

./bin/hadoop fs -putREADME.txt readme.txt

[2].从hadoop脚本中可知,fs对应的类是org.apache.hadoop.fs.FsShell,这个类对应的源代码在core/org/apache/hadoop/fs/FsShell.java文件。

[3]. FsShell类的入口是main函数。

[4].在main函数,对FsShell进行一次实例化,FsShellshell = new FsShell(),然后用ToolRunner将argv参数传给shell执行。

[5].ToolRunner调用shell的run函数,参数是argv。

[6].在FsShell的run函数,首先会检查参数是否合理,然后执行init函数,进行初始化。初始化很简单,就是获取文件系统,获取Trash。

[7].对于”-put”命令,run函数用一个Path数组保存源文件路径和目的路径,然后调用copyFromLocal函数进行处理。

[8].在FsShell的copyFromLocal函数中,获取FileSystem,然后执行FileSystem的copyFromLocalFile函数。

[9].FileSystem的copyFromLocalFile函数有多个重载的版本,几经周转之后,最终调用的是FileUtil的copy函数:

FileUtil.copy(getLocal(conf),srcs, this, dst, delSrc, overwrite, conf);

[10].FileSystem的copy函数也有多个重载的版本,经过多次调用后,最终调用的是这个copy函数:

copy(FileSystem srcFS, Pathsrc, FileSystem dstFS, Path dst, boolean deleteSource,booleanoverwrite, Configuration conf)

将README.txt复制到HDFS,对应下面这段代码:

InputStreamin=null;
OutputStream out =null;

try {

in =srcFS.open(src);

out =dstFS.create(dst, overwrite);

IOUtils.copyBytes(in,out, conf, true);

}

在fileSystem中,open函数和create函数都有多个重载的版本,几经周转,最终都是调用都是虚函数版本,而虚函数的实现,取决于FileSystem的继承类对虚函数open和虚函数create的实现。

各种文件系统的实现,是在目录core/org/apache/hadoop/fs里,如果想知道具体有哪些文件系统实现了,可以在这个目录用grep查一下:

grep -rin “FSDataOutputStreamcreate(” *

Hadoop实现了很多文件系统,如下:

ChecksumFileSystem,

FilterFileSystem,

FTPFileSystem,

HarFileSystem,

InMemoryFileSystem,

KFSImpl,

IFSImpl,

KosmosFileSystem,

RawLocalFileSystem,

S3FileSystem,

NativeS3FileSystem

当然,其实最重要的是在这里HDFS,分布式文件系统,代码目录是hdfs/org/apache/hadoop/hdfs,这里FileSystem的继承类如下:

DistributedFileSystem

HftpFileSystem

WebHdfsFileSystem

[11].IOUtils类在输入流和输出流之间传递数据。IOUtils里的函数都是静态函数,所以它只是一个容器,把各种函数打包成类,IOUtils的copyBytes函数,经过几次周转,最终调用的是函数:

copyBytes(InputStreamin, OutputStream out,

final longlength, final int bufferSize, final boolean close)

在这个函数里,数据读写的核心代码是:

intn = 0;
for(long remaining= length;

remaining >0 && n != -1;

remaining -=n) {

final int toRead= remaining < buf.length? (int)remaining : buf.length;

n = in.read(buf,0, toRead);

if (n > 0) {

out.write(buf,0, n);

}

}

这段代码从输入流读数据到缓存,然后再写入到输出流。

2.输出流是如何获取FileSystem

在wordCount例子中,输出流对应的文件名是”readme.txt”。 输出流在FsShell类的copyFromLocal函数获取FileSystem,语句如下:

Path dstPath = newPath(dstf);

FileSystem dstFs =dstPath.getFileSystem(getConf());

dstPath执行的构造函数是Path(StringpathString)。初始化的时候,检查pathString中的“:”和”/”位置,由此得到URI的Scheme。最终,dstPath的uri的值是readme.txt。

在dstPath执行getFileSystem比较复杂,次序如下:

[1].在Path类执行getFileSystem(Configurationconf)函数。

[2].在FileSystem类执行get(URIuri, Configurationconf),在这个函数里,因为shceme=null,于是执行另外一个get重载版本。因为输出文件名是”readme.txt”,无法直接得到文件系统。

[3].在FileSystem类执行get(Configurationconf)
,这个函数调用getDefaultUri函数,获取的默认的URI,然后再次调用get(URIuri, Configuration conf)函数。

[4].在fileSystem类执行getDefaultUri函数得到默认的URI,也就是hdfs://localhost:9000。

[5].然后再回到[2],这时候就可以参数uri是默认值,就可以执行到return语句,在return语句执行CACHE.get(uri,conf)。

[6].执行Cache类的get函数,函数原型是get(URIuri, Configuration conf)。CACHE对象属于Cache类。Cache类是FileSystem的内部类。如果uri对应的FileSystem对象已经在缓存里,返回它,如果不在,调用FileSystem的createFileSystem(uri,conf)函数创建它。

[7].执行FileSystem的createFileSystem函数,函数原型是:

FileSystemcreateFileSystem(URI uri, Configuration conf)

在这里函数里,从配置中读取hfds对应的实现类:

Class<?> clazz =conf.getClass("fs." + uri.getScheme() + ".impl",null)

"fs." +uri.getScheme() + ".impl"
就是”fs.hdfs.impl”

于是,clazz值就是org.apache.hadoop.hdfs.DistributedFileSystem。然后,通过反射的方式,生成一个clazz对象,并将类型转换成FileSystem:

FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz, conf)

然后返回fs,这就是输出流的文件系统。

3.输入流如何获取FileSystem

输入流获取FileSystem的地方是在FileSytem类调用copyFromLocalFile函数的时候。

[1].在FileSystem调用copyFromLocalFile函数,copyFromLocalFile函数调用getLocal函数。

[2].在FileSystem的getLocal函数,调用get(LocalFileSystem.NAME,conf)函数,这时候,LocalFileSystem.NAME的默认值是uri=”file:///”。

[3].在fileSystem的get函数,执行的步骤跟输出流获取文件系统的方式差不多,最终会调用createFileSytem函数,此时,从配置里读“fs.file.impl”对应的值,也就是clazz的值,这个值是clazz=
class org.apache.hadoop.fs.LocalFileSystem,然后,同样通过反射的方式,创建一个LocalFileSystem对象,然后转化成FileSystem类型并返回它。

4. 创建输入流和输出流

文件复制,需要创建输入流和输出流,代码如下:

in= srcFS.open(src);
out = dstFS.create(dst,overwrite);

srcFs是LocalFileSystem,它没有实现open函数,所以它调用的是父类ChecksumFileSystem的open函数。

dstFS是DistributedFileSystem,它实现了create函数。但其实这个一个很有趣的过程,dstFs先调用的是FileSystem的create函数,也就是两个参数的版本。但实际上,这个函数要在FileSystem中不断调用其他的各种参数的create函数,最终调用的虚函数:

public abstractFSDataOutputStream create(Path f,

FsPermission permission,

boolean overwrite,

int bufferSize,

short replication,

long blockSize,

Progressable progress)throws IOException;

DistributedFileSystem实现的就是这个7个参数的create函数。函数create调用环节非常之多,如果你去看源代码的话,能体会到Hadoop在版本更替中所做的种种努力调和各种问题。

DistributedFileSystem和ChecksumFileSystem的输入流和输出流的实现非常精彩,非常值得一读,非常有助于提升功力,它们是Hadoop的精华所在!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop 大数据 java
相关文章推荐