熟练掌握HDFS的Java API接口访问
2016-04-15 19:24
579 查看
HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。
通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部
1、获取文件系统
2、创建文件目录
3、删除文件或者文件目录
3、根据filter获取目录下的文件
4、文件上传至 HDFS
5、从 HDFS 下载文件
6、获取 HDFS 集群节点信息
7、查找某个文件在 HDFS集群的位置
8、文件重命名
9、判断目录是否存在
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
地址:下载
通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部
1、获取文件系统
/** * 获取文件系统 * * @return FileSystem */ public static FileSystem getFileSystem() { //读取配置文件 Configuration conf = new Configuration(); // 文件系统 FileSystem fs = null; String hdfsUri = HDFSUri; if(StringUtils.isBlank(hdfsUri)){ // 返回默认文件系统 如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统 try { fs = FileSystem.get(conf); } catch (IOException e) { logger.error("", e); } }else{ // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统 try { URI uri = new URI(hdfsUri.trim()); fs = FileSystem.get(uri,conf); } catch (URISyntaxException | IOException e) { logger.error("", e); } } return fs; }
2、创建文件目录
/** * 创建文件目录 * * @param path */ public static void mkdir(String path) { try { // 获取文件系统 FileSystem fs = getFileSystem(); String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } // 创建目录 fs.mkdirs(new Path(path)); //释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { logger.error("", e); } }
3、删除文件或者文件目录
/** * 删除文件或者文件目录 * * @param path */ public static void rmdir(String path) { try { // 返回FileSystem对象 FileSystem fs = getFileSystem(); String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } // 删除文件或者文件目录 delete(Path f) 此方法已经弃用 fs.delete(new Path(path),true); // 释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { logger.error("", e); } }
3、根据filter获取目录下的文件
/** * 根据filter获取目录下的文件 * * @param path * @param pathFilter * @return String[] */ public static String[] ListFile(String path,PathFilter pathFilter) { String[] files = new String[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(); String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } FileStatus[] status; if(pathFilter != null){ // 根据filter列出目录内容 status = fs.listStatus(new Path(path),pathFilter); }else{ // 列出目录内容 status = fs.listStatus(new Path(path)); } // 获取目录下的所有文件路径 Path[] listedPaths = FileUtil.stat2Paths(status); // 转换String[] if (listedPaths != null && listedPaths.length > 0){ files = new String[listedPaths.length]; for (int i = 0; i < files.length; i++){ files[i] = listedPaths[i].toString(); } } // 释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { logger.error("", e); } return files; }
4、文件上传至 HDFS
/** * 文件上传至 HDFS * * @param delSrc * @param overwrite * @param srcFile * @param destPath */ public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) { // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt Path srcPath = new Path(srcFile); // 目的路径 String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ destPath = hdfsUri + destPath; } Path dstPath = new Path(destPath); // 实现文件上传 try { // 获取FileSystem对象 FileSystem fs = getFileSystem(); fs.copyFromLocalFile(srcPath, dstPath); fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath); //释放资源 fs.close(); } catch (IOException e) { logger.error("", e); } }
5、从 HDFS 下载文件
/** * 从 HDFS 下载文件 * * @param srcFile * @param destPath */ public static void getFile(String srcFile,String destPath) { // 源文件路径 String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ srcFile = hdfsUri + srcFile; } Path srcPath = new Path(srcFile); // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/ Path dstPath = new Path(destPath); try { // 获取FileSystem对象 FileSystem fs = getFileSystem(); // 下载hdfs上的文件 fs.copyToLocalFile(srcPath, dstPath); // 释放资源 fs.close(); } catch (IOException e) { logger.error("", e); } }
6、获取 HDFS 集群节点信息
/** * 获取 HDFS 集群节点信息 * * @return DatanodeInfo[] */ public static DatanodeInfo[] getHDFSNodes() { // 获取所有节点 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(); // 获取分布式文件系统 DistributedFileSystem hdfs = (DistributedFileSystem)fs; dataNodeStats = hdfs.getDataNodeStats(); } catch (IOException e) { logger.error("", e); } return dataNodeStats; }
7、查找某个文件在 HDFS集群的位置
/** * 查找某个文件在 HDFS集群的位置 * * @param filePath * @return BlockLocation[] */ public static BlockLocation[] getFileBlockLocations(String filePath) { // 文件路径 String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ filePath = hdfsUri + filePath; } Path path = new Path(filePath); // 文件块位置列表 BlockLocation[] blkLocations = new BlockLocation[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(); // 获取文件目录 FileStatus filestatus = fs.getFileStatus(path); //获取文件块位置列表 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); } catch (IOException e) { logger.error("", e); } return blkLocations; }
8、文件重命名
/** * 文件重命名 * * @param srcPath * @param dstPath */ public boolean rename(String srcPath, String dstPath){ boolean flag = false; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(); String hdfsUri = HDFSUri; if(StringUtils.isNotBlank(hdfsUri)){ srcPath = hdfsUri + srcPath; dstPath = hdfsUri + dstPath; } flag = fs.rename(new Path(srcPath), new Path(dstPath)); } catch (IOException e) { logger.error("{} rename to {} error.", srcPath, dstPath); } return flag; }
9、判断目录是否存在
/** * 判断目录是否存在 * * @param srcPath * @param dstPath */ public boolean existDir(String filePath, boolean create){ boolean flag = false; if (StringUtils.isEmpty(filePath)){ return flag; } try{ Path path = new Path(filePath); // FileSystem对象 FileSystem fs = getFileSystem(); if (create){ if (!fs.exists(path)){ fs.mkdirs(path); } } if (fs.isDirectory(path)){ flag = true; } }catch (Exception e){ logger.error("", e); } return flag; }
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
地址:下载
相关文章推荐
- 用Java写脚本,常用的一些方法
- Java8之Stream语法详解
- java并发:线程同步机制之计数器&Exechanger
- JAVA IO(一)
- java第五次作业
- 【Java】Map
- jdk 安装
- spring webflow getting start
- java反射以获取父类属性的值
- JAVA 反射总结
- java 执行bat批处理文件 并关闭cmd窗口
- 【华为机试题】和尚挑水
- java爬虫 之 搜狐新闻爬虫(三)
- eclipse svn使用小记
- JAVA学习篇--静态代理VS动态代理
- Java--计算中英文长度的若干种方法
- java爬虫 之 搜狐新闻爬虫(二)
- javaw.exe 和java.exe的区别
- 字符串转AscII
- Spring+SpringMVC+MyBaties学习笔记(一)环境搭建