hdfs上传文件的源码分析
2015-05-22 11:18
288 查看
之前上传下载hdfs文件都是使用的IOUtils.copyBytes(...),不过也看见过别的方式FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)等通过FileSystem操作文件的所以就追踪了一下FileSystem.copyFromLocalFile的执行过程。
//调用了 FileUtil.copy(),进到这个copy()里:
//这个copy把第二个参数的Path src改为了 FileStatus
fileStatus,再进到这个copy():
//这个copy最后还是调用了 IOUtils.copyBytes(in, out, conf, true) ,再进到copyBytes(in, out, conf, true)里:
//这个是调用了 copyBytes(in,
out, buffSize),继续追进去:
//到这结束了,这个就把本件以流的形式写到了hdfs文件对应的存储地方了。不过有必要看一下是怎么把hdfs
path转为输出流的即上面的深红色标注的 out = dstFS.create(dst, overwrite);
在fileSystem中,open函数和create函数都有多个重载的版本,几经周转,最终都是调用都是虚函数版本,而虚函数的实现,取决于FileSystem的继承类对虚函数open和虚函数create的实现。所以有必要查看获得FileSystem的相关源码了,我用的是FileSystem
fileSystem = FileSystem.get(new Configuration());这个的流程大概如下:
[1].
在FileSystem类执行get(Configuration conf) ,这个函数调用getDefaultUri函数
[2]. 在fileSystem类执行getDefaultUri函数得到默认的URI,conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS),要获得FS_DEFAULT_NAME_KEY(即是core-site.xml中的fs.defaultFS值)也就是hdfs://{ip}:8020。,获取的默认的URI,并给scheme赋值,再给authority授权,然后再次调用get(URI
uri, Configuration conf)函数。 在return语句执行CACHE.get(uri, conf)。
[3]. 执行Cache类的get函数,函数原型是get(URI uri, Configuration conf)。 CACHE对象属于Cache类。Cache类是FileSystem的内部类。如果uri对应的FileSystem对象已经在缓存里,返回它,如果不在,调用FileSystem的createFileSystem(uri,
conf)函数创建它。
[4]. 执行FileSystem的createFileSystem函数,函数原型是:
FileSystem createFileSystem(URI uri, Configuration conf)
在这个函数里,从配置中读取hfds对应的实现类:
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
getFileSystemClass里有:
loadFileSystems()
//将FileSystem的实现类都加载上了
clazz
= SERVICE_FILE_SYSTEMS.get(scheme);//获得scheme(值为hdfs)的实现类,于是,clazz
值就是org.apache.hadoop.hdfs.DistributedFileSystem。然后,通过反射的方式,生成一个clazz对象,并将类型转换成FileSystem:
FileSystem
fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf)
然后返回fs,这就是输出流的文件系统。
ps:在conf.getClass("fs."
+ scheme + ".impl", null)这个函数起初说什么也不知道是什么值,后来通过debug源码,一行行看发现是loadFileSystems里ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); for
(FileSystem fs : serviceLoader) {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
}加载的DistributedFileSystem。
DistributedFileSystem中create()的源码如下:
<span style="font-size:18px;"> public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf); }</span>
//调用了 FileUtil.copy(),进到这个copy()里:
<span style="font-size:18px;"> public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { FileStatus fileStatus = srcFS.getFileStatus(src); return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); }</span>
//这个copy把第二个参数的Path src改为了 FileStatus
fileStatus,再进到这个copy():
public static boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, boolean deleteSource,boolean overwrite, Configuration conf) throws IOException { Path src = srcStatus.getPath(); dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcStatus.isDirectory()) { checkDependencies(srcFS, src, dstFS, dst); //校验目标路径是否是源路径或者子路径,如果是就返回错误信息 if (!dstFS.mkdirs(dst)) { return false; } FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); //这个copy是遍历源文件目录下的文件传到hdfs } } else { InputStream in=null; OutputStream out = null; try { in = srcFS.open(src); <span style="color:#cc0000;"> out = dstFS.create(dst, overwrite); //创建hdfs的路径输出流,用于后面的流的读写</span> IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } if (deleteSource) { return srcFS.delete(src, true); } else { return true; } }</span>
//这个copy最后还是调用了 IOUtils.copyBytes(in, out, conf, true) ,再进到copyBytes(in, out, conf, true)里:
<span style="font-size:18px;">public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { copyBytes(in, out, buffSize); if(close) { out.close(); out = null; in.close(); in = null; } }</span>
//这个是调用了 copyBytes(in,
out, buffSize),继续追进去:
<span style="font-size:18px;"> public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } }</span>
//到这结束了,这个就把本件以流的形式写到了hdfs文件对应的存储地方了。不过有必要看一下是怎么把hdfs
path转为输出流的即上面的深红色标注的 out = dstFS.create(dst, overwrite);
在fileSystem中,open函数和create函数都有多个重载的版本,几经周转,最终都是调用都是虚函数版本,而虚函数的实现,取决于FileSystem的继承类对虚函数open和虚函数create的实现。所以有必要查看获得FileSystem的相关源码了,我用的是FileSystem
fileSystem = FileSystem.get(new Configuration());这个的流程大概如下:
[1].
在FileSystem类执行get(Configuration conf) ,这个函数调用getDefaultUri函数
[2]. 在fileSystem类执行getDefaultUri函数得到默认的URI,conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS),要获得FS_DEFAULT_NAME_KEY(即是core-site.xml中的fs.defaultFS值)也就是hdfs://{ip}:8020。,获取的默认的URI,并给scheme赋值,再给authority授权,然后再次调用get(URI
uri, Configuration conf)函数。 在return语句执行CACHE.get(uri, conf)。
[3]. 执行Cache类的get函数,函数原型是get(URI uri, Configuration conf)。 CACHE对象属于Cache类。Cache类是FileSystem的内部类。如果uri对应的FileSystem对象已经在缓存里,返回它,如果不在,调用FileSystem的createFileSystem(uri,
conf)函数创建它。
[4]. 执行FileSystem的createFileSystem函数,函数原型是:
FileSystem createFileSystem(URI uri, Configuration conf)
在这个函数里,从配置中读取hfds对应的实现类:
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
getFileSystemClass里有:
loadFileSystems()
//将FileSystem的实现类都加载上了
clazz
= SERVICE_FILE_SYSTEMS.get(scheme);//获得scheme(值为hdfs)的实现类,于是,clazz
值就是org.apache.hadoop.hdfs.DistributedFileSystem。然后,通过反射的方式,生成一个clazz对象,并将类型转换成FileSystem:
FileSystem
fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf)
然后返回fs,这就是输出流的文件系统。
ps:在conf.getClass("fs."
+ scheme + ".impl", null)这个函数起初说什么也不知道是什么值,后来通过debug源码,一行行看发现是loadFileSystems里ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); for
(FileSystem fs : serviceLoader) {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
}加载的DistributedFileSystem。
DistributedFileSystem中create()的源码如下:
<span style="color:#ff0000;"> </span> @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { return new HdfsDataOutputStream(dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt), statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }
相关文章推荐
- Hadoop之HDFS原理及文件上传下载源码分析(下)
- Hadoop之HDFS原理及文件上传下载源码分析(上)
- Hadoop之HDFS原理及文件上传下载源码分析(下)
- SpringMVC 中文件上传 MultipartResolver两种使用方式及简单源码分析
- HDFS dfsclient读文件过程 源码分析
- Hadoop HDFS 文件访问权限问题导致Java Web 上传文件到Hadoop失败的原因分析及解决方法
- Volley源码分析之自定义MultiPartRequest(文件上传)
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
- PHP 文件上传源码分析(RFC1867)
- hadoop分析 - HDFS上传文件
- Hadoop源码分析--HDFS读取文件
- Hadoop-源码分析--HDFS读取文件
- HDFS2.X源码分析之:NameNode写文件原理
- Okhttp文件上传源码分析
- SpringMVC源码分析--文件上传
- hadoop源码解析之hdfs写数据全流程分析---创建文件
- HDFS dfsclient写文件过程 源码分析
- SpringMVC源码分析--文件上传
- PHP文件上传源码分析(RFC1867)
- ASP.NET上传视频文件同时转换为flv并且抓取第一帧生面图片源码分析