您的位置:首页 > 编程语言 > Java开发

调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作

2016-01-28 16:50 976 查看
Hadoop文件系统 
基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。 

Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例: 
public static FileSystem.get(Configuration conf) throws IOException 

public static FileSystem.get(URI uri, Configuration conf) throws IOException 

具体方法实现: 
1、public boolean mkdirs(Path f) throws IOException 
一次性新建所有目录(包括父目录), f是完整的目录路径。 

2、public FSOutputStream create(Path f) throws IOException 
创建指定path对象的一个文件,返回一个用于写入数据的输出流 
create()有多个重载版本,允许我们指定是否强制覆盖已有的文件、文件备份数量、写入文件缓冲区大小、文件块大小以及文件权限。 

3、public boolean copyFromLocal(Path src, Path dst) throws IOException 
将本地文件拷贝到文件系统 

4、public boolean exists(Path f) throws IOException 
检查文件或目录是否存在 

5、public boolean delete(Path f, Boolean recursive) 
永久性删除指定的文件或目录,如果f是一个空目录或者文件,那么recursive的值就会被忽略。只有recursive=true时,一个非空目录及其内容才会被删除。 

6、FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

public class HdfsOperate {
ArrayList<HdfsFile> hdfsfiles;

public HdfsOperate() {
this.hdfsfiles = new ArrayList<HdfsFile>();
}

/**
* 获取hdfs路径下的文件列表
*
* @param srcpath
* @return
*/
public String[] getFileList(String srcpath) {
try {
Configuration conf = new Configuration();
Path path = new Path(srcpath);
FileSystem fs = path.getFileSystem(conf);
List<String> files = new ArrayList<String>();
if (fs.exists(path) && fs.isDirectory(path)) {
for (FileStatus status : fs.listStatus(path)) {
files.add(status.getPath().toString());
}
}
//fs.close();
return files.toArray(new String[]{});
} catch (IOException e) {
} catch (Exception e) {
}
return null;
}

/**
* 给定文件名和文件内容,创建hdfs文件
*
* @param dst
* @param contents
* @throws IOException
*/
public void createFile(String dst, byte[] contents)
throws IOException {
Configuration conf = new Configuration();
Path dstPath = new Path(dst);
FileSystem fs = dstPath.getFileSystem(conf);

FSDataOutputStream outputStream = fs.create(dstPath);
outputStream.write(contents);
outputStream.close();
System.out.println("create file " + dst + " success!");
//fs.close();
}

/**
* 删除hdfs文件
*
* @param filePath
* @throws IOException
*/
public void delete(String filePath) throws IOException {
Configuration conf = new Configuration();
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(conf);

boolean isok = fs.deleteOnExit(path);
if (isok) {
System.out.println("delete file " + filePath + " success!");
} else {
System.out.println("delete file " + filePath + " failure");
}
//fs.close();
}

/**
* 创建hdfs目录
*
* @param path
* @throws IOException
*/
public void mkdir(String path) throws IOException {
Configuration conf = new Configuration();
Path srcPath = new Path(path);
FileSystem fs = srcPath.getFileSystem(conf);

boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("create dir ok!");
} else {
System.out.println("create dir failure");
}
//fs.close();
}

/**
* 读取hdfs文件内容,并在控制台打印出来
*
* @param filePath
* @throws IOException
*/
public void readFile(String filePath) throws IOException {
Configuration conf = new Configuration();
Path srcPath = new Path(filePath);
FileSystem fs = null;
URI uri;
try {
uri = new URI(filePath);
fs = FileSystem.get(uri, conf);
} catch (URISyntaxException e) {
e.printStackTrace();
}
InputStream in = null;
try {
in = fs.open(srcPath);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}

/**
* 下载hdfs文件到本地目录
*
* @param dstPath
* @param srcPath
* @throws Exception
*/
public void downloadFile(String dstPath, String srcPath) throws Exception {
Path path = new Path(srcPath);
Configuration conf = new Configuration();
FileSystem hdfs = path.getFileSystem(conf);

File rootfile = new File(dstPath);
if (!rootfile.exists()) {
rootfile.mkdirs();
}

if (hdfs.isFile(path)) {
//只下载非txt文件
String fileName = path.getName();
if (!fileName.toLowerCase().endsWith("txt")) {
FSDataInputStream in = null;
FileOutputStream out = null;
try {
in = hdfs.open(path);
File srcfile = new File(rootfile, path.getName());
if (!srcfile.exists()) srcfile.createNewFile();
out = new FileOutputStream(srcfile);
IOUtils.copyBytes(in, out, 4096, false);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
//下载完后,在hdfs上将原文件删除
this.delete(path.toString());
}
} else if (hdfs.isDirectory(path)) {
File dstDir = new File(dstPath);
if (!dstDir.exists()) {
dstDir.mkdirs();
}
//在本地目录上加一层子目录
String filePath = path.toString();//目录
String subPath[] = filePath.split("/");
String newdstPath = dstPath + subPath[subPath.length - 1] + "/";
System.out.println("newdstPath=======" + newdstPath);
if (hdfs.exists(path) && hdfs.isDirectory(path)) {
FileStatus[] srcFileStatus = hdfs.listStatus(path);
if (srcFileStatus != null) {
for (FileStatus status : hdfs.listStatus(path)) {
//下载子目录下文件
downloadFile(newdstPath, status.getPath().toString());
}
}
}
}
}

/**
* 下载hdfs文件内容,保存到内存对象中
*
* @param srcPath
* @throws Exception
*/
public void downloadFileByte(String srcPath) throws Exception {
Path path = new Path(srcPath);
FileSystem hdfs = null;
Configuration conf = new Configuration();
hdfs = FileSystem.get(URI.create(srcPath), conf);
if (hdfs.exists(path)) {
if (hdfs.isFile(path)) {
//如果是文件
FSDataInputStream in = null;
FileOutputStream out = null;
try {
in = hdfs.open(new Path(srcPath));
byte[] t = new byte[in.available()];
in.read(t);
hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t));
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
} else {
//如果是目录
FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath));
for (int i = 0; i < srcFileStatus.length; i++) {
String srcFile = srcFileStatus[i].getPath().toString();
downloadFileByte(srcFile);
}
}
}
}

public ArrayList<HdfsFile> getHdfsfiles() {
return hdfsfiles;
}

/**
* 将本地目录或文件上传的hdfs
*
* @param localSrc
* @param dst
* @throws Exception
*/
public void uploadFile(String localSrc, String dst) throws Exception {

Configuration conf = new Configuration();
File srcFile = new File(localSrc);
if (srcFile.isDirectory()) {
copyDirectory(localSrc, dst, conf);
} else {
copyFile(localSrc, dst, conf);
}
}

/**
* 拷贝本地文件hdfs目录下
*
* @param localSrc
* @param dst
* @param conf
* @return
* @throws Exception
*/
private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception {
File file = new File(localSrc);
dst = dst + file.getName();
Path path = new Path(dst);
FileSystem fs = path.getFileSystem(conf);//FileSystem.get(conf);
fs.exists(path);
InputStream in = new BufferedInputStream(new FileInputStream(file));
OutputStream out = fs.create(new Path(dst));
IOUtils.copyBytes(in, out, 4096, true);
in.close();
return true;
}

/**
* 拷贝本地目录到hdfs
* @param src
* @param dst
* @param conf
* @return
* @throws Exception
*/
private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception {
Path path = new Path(dst);
FileSystem fs = path.getFileSystem(conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
}
File file = new File(src);

File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
File f = files[i];
if (f.isDirectory()) {
String fname = f.getName();
if (dst.endsWith("/")) {
copyDirectory(f.getPath(), dst + fname + "/", conf);
} else {
copyDirectory(f.getPath(), dst + "/" + fname + "/", conf);
}
} else {
copyFile(f.getPath(), dst, conf);
}
}
return true;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdfs 文件系统 java