您的位置:首页 > 编程语言 > Java开发

hadoop-hdfs-文件工具类(Java)

2017-03-13 17:28 169 查看
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class HdfsUtil {

/**
* ls
*/
public void listFiles(String specialPath) {
FileSystem fileSystem = null;
try {
fileSystem = this.getFS();
;

FileStatus[] fstats = fileSystem.listStatus(new Path(specialPath));

for (FileStatus fstat : fstats) {
System.out.println(fstat.isDirectory() ? "directory" : "file");
System.out.println("Permission:" + fstat.getPermission());
System.out.println("Owner:" + fstat.getOwner());
System.out.println("Group:" + fstat.getGroup());
System.out.println("Size:" + fstat.getLen());
System.out.println("Replication:" + fstat.getReplication());
System.out.println("Block Size:" + fstat.getBlockSize());
System.out.println("Name:" + fstat.getPath());

System.out.println("#############################");
}

} catch (IOException e) {
e.printStackTrace();
System.err.println("link err");
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

/**
* cat
*
* @param hdfsFilePath
*/
public void cat(String hdfsFilePath) {
FileSystem fileSystem = null;
try {
fileSystem = this.getFS();

FSDataInputStream fdis = fileSystem.open(new Path(hdfsFilePath));

IOUtils.copyBytes(fdis, System.out, 1024);

} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fileSystem);
}

}

/**
* 创建目录
*
* @param hdfsFilePath
*/
public void mkdir(String hdfsFilePath) {

FileSystem fileSystem = this.getFS();

try {
boolean success = fileSystem.mkdirs(new Path(hdfsFilePath));
if (success) {
System.out.println("Create directory or file successfully");
}
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
this.closeFS(fileSystem);
}

}

/**
* 删除文件或目录
*
* @param hdfsFilePath
* @param recursive    递归
*/
public void rm(String hdfsFilePath, boolean recursive) {
FileSystem fileSystem = this.getFS();
try {
boolean success = fileSystem.delete(new Path(hdfsFilePath), recursive);
if (success) {
System.out.println("delete successfully");
}
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
this.closeFS(fileSystem);
}
}

/**
* 上传文件到HDFS
*
* @param localFilePath
* @param hdfsFilePath
*/
public void put(String localFilePath, String hdfsFilePath) {
FileSystem fileSystem = this.getFS();
try {
FSDataOutputStream fdos = fileSystem.create(new Path(hdfsFilePath));
FileInputStream fis = new FileInputStream(new File(localFilePath));
IOUtils.copyBytes(fis, fdos, 1024);

} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fileSystem);
}
}

public void read(String fileName) throws Exception {

// get filesystem
FileSystem fileSystem = this.getFS();

Path readPath = new Path(fileName);

// open file
FSDataInputStream inStream = fileSystem.open(readPath);

try {
// read
IOUtils.copyBytes(inStream, System.out, 4096, false);
} catch (Exception e) {
e.printStackTrace();
} finally {
// close Stream
IOUtils.closeStream(inStream);
}
}

/**
* 下载文件到本地
*
* @param localFilePath
* @param hdfsFilePath
*/
public void get(String localFilePath, String hdfsFilePath) {
FileSystem fileSystem = this.getFS();
try {
FSDataInputStream fsis = fileSystem.open(new Path(hdfsFilePath));
FileOutputStream fos = new FileOutputStream(new File(localFilePath));
IOUtils.copyBytes(fsis, fos, 1024);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fileSystem);
}
}

public void write(String localPath, String hdfspath) throws Exception {
FileInputStream inStream = new FileInputStream(
new File(localPath)
);
FileSystem fileSystem = this.getFS();

Path writePath = new Path(hdfspath);

// Output Stream
FSDataOutputStream outStream = fileSystem.create(writePath);

try {
IOUtils.copyBytes(inStream, outStream, 4096, false);
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(inStream);
IOUtils.closeStream(outStream);
}

}

/**
* 获取FileSystem实例
*
* @return
*/
private FileSystem getFS() {
System.setProperty("hadoop.home.dir", "D:\\04coding\\projects-bigData\\Hadoop\\hadoop-2.5.0");
System.setProperty("HADOOP_USER_NAME", "xiaoyuzhou");

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://xyz01.aiso.com:8020/");
conf.set("mapred.remote.os", "Linux");
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(conf);

return fileSystem;
} catch (IOException e) {
e.printStackTrace();
}

return null;
}

/**
* 关闭FileSystem
*
* @param fileSystem
*/
private void closeFS(FileSystem fileSystem) {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: