您的位置:首页 > 编程语言 > Java开发

java实现对HDFS增删改查(CRUD)等操作

2014-03-12 21:16 706 查看

实现对HDFS增删改查CRUD等操作

1 查找

列出某个目录下的文件名称,hdfs命令如下所示:

hdfs dfs –ls/usr/app

java代码片段:

public void list(String srcPath) {
Configuration conf = new Configuration();
LOG.info("[Defaultfs] :" +conf.get("fs.default.name"));
//                conf.set("hadoop.job.ugi","app,app");   //It is not necessary for the default user.
FileSystem fs;
try {
fs= FileSystem.get(conf);
RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath));
while (rmIterator.hasNext()) {
Path path = rmIterator.next().getPath();
if(fs.isDirectory(path)){
LOG.info("-----------DirectoryName: "+path.getName());
}
else if(fs.isFile(path)){
LOG.info("-----------FileName: "+path.getName());
}
}
}catch (IOException e) {
LOG.error("list fileSysetm object stream.:" , e);
new RuntimeException(e);
}
}


输出结果:

2014-03-11 22:38:15,329 INFO  (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt

2014-03-11 22:38:15,331 INFO  (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost

2014-03-11 22:38:15,333 INFO  (com.hdfs.client.SyncDFS:45) ------------Directory Name: test

读取文件中的内容,hdfs命令如下:

hdfs dfs –cat /input

java 代码:

public void readFile(String file){
Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);
Pathpath = new Path(file);
if(!fs.exists(path)){
LOG.warn("file'"+ file+"' doesn't exist!");
return ;
}
FSDataInputStreamin = fs.open(path);
Stringfilename = file.substring(file.lastIndexOf('/') + 1, file.length());
OutputStreamout = new BufferedOutputStream(new FileOutputStream(
new File(filename)));

byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = in.read(b)) > 0) {
out.write(b,0, numBytes);
}
in.close();
out.close();
fs.close();
}catch (IOException e) {
LOG.error("ifExists fs Exception caught! :" , e);
new RuntimeException(e);
}
}


获取文件的修改时间,java代码:

/**
* Gets the information about the file modifiedtime.
* @param source
* @throws IOException
*/
public void getModificationTime(String source) throws IOException{

Configurationconf = new Configuration();

FileSystemfs = FileSystem.get(conf);
PathsrcPath = new Path(source);

// Check if the file alreadyexists
if (!(fs.exists(srcPath))) {
System.out.println("No such destination " + srcPath);
return;
}
// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

FileStatusfileStatus = fs.getFileStatus(srcPath);
long modificationTime =fileStatus.getModificationTime();

LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime));

}


获取文件块定位信息,java代码:

/**
* Gets the file block location info
* @param source
* @throws IOException
*/
public void getBlockLocations(String source) throws IOException{
Configurationconf = new Configuration();
FileSystemfs = FileSystem.get(conf);
PathsrcPath = new Path(source);

// Check if the file alreadyexists
if (!(ifExists(source))) {
System.out.println("No such destination " + srcPath);
return;
}
// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

FileStatusfileStatus = fs.getFileStatus(srcPath);

BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
int blkCount = blkLocations.length;

System.out.println("File :" + filename + "stored at:");
for (int i=0; i < blkCount; i++) {
String[]hosts = blkLocations[i].getHosts();
LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts));
}
}


获取Hadoop集群中data node的DNS主机名,java代码:

public void getHostnames () throwsIOException{
Configurationconfig = new Configuration();

FileSystemfs = FileSystem.get(config);
DistributedFileSystemhdfs = (DistributedFileSystem) fs;
DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats();

String[]names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i]= dataNodeStats[i].getHostName();
LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName()));
}
}


 

2 创建

创建一个目录,指定具体的文件路径。hdfs命令如下:

hdfs dfs –mkdir/usr/app/tmp


java代码:

       
public void mkdir(String dir){
Configurationconf = new Configuration();
FileSystemfs = null;
try {
fs= FileSystem.get(conf);
Pathpath = new Path(dir);
if(!fs.exists(path)){
fs.mkdirs(path);
LOG.debug("create directory '"+dir+"' successfully!");
}else{
LOG.debug("directory '"+dir+"' exits!");
}
}catch (IOException e) {
LOG.error("FileSystem get configuration with anerror");
e.printStackTrace();
}finally{
if(fs!= null){
try {
fs.close();
}catch (IOException e) {
LOG.error("close fs object stream. :" , e);
new RuntimeException(e);
}
}
}
}


将本地文件上传到hdfs上去,java代码如下:

public void copyFromLocal (String source, String dest) {

Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);
PathsrcPath = new Path(source);

PathdstPath = new Path(dest);
// Check if the file alreadyexists
if (!(fs.exists(dstPath))) {
LOG.warn("dstPathpath doesn't exist" );
LOG.error("No such destination " + dstPath);
return;
}

// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

try{
//if the file exists in thedestination path, it will throw exception.
//                                   fs.copyFromLocalFile(srcPath,dstPath);
//remove and overwrite files withthe method
//copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(false, true, srcPath, dstPath);
LOG.info("File " + filename + "copied to " + dest);
}catch(Exception e){
LOG.error("copyFromLocalFile exception caught!:" , e);
new RuntimeException(e);
}finally{
fs.close();
}
}catch (IOException e1) {
LOG.error("copyFromLocal IOException objectstream. :" ,e1);
new RuntimeException(e1);
}
}


        

添加一个文件到指定的目录下,java代码如下:

      
public void addFile(String source, String dest)  {
// Conf object will readthe HDFS configuration parameters
Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);
// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

// Create the destination pathincluding the filename.
if (dest.charAt(dest.length() - 1) != '/') {
dest= dest + "/" + filename;
}else {
dest= dest + filename;
}

// Check if the file alreadyexists
Pathpath = new Path(dest);
if (fs.exists(path)) {
LOG.error("File " + dest + " already exists");
return;
}

// Create a new file and writedata to it.
FSDataOutputStreamout = fs.create(path);
InputStreamin = new BufferedInputStream(new FileInputStream(
new File(source)));

byte[] b = new byte[1024];
int numBytes = 0;
//In this way read and write datato destination file.
while ((numBytes = in.read(b)) > 0) {
out.write(b,0, numBytes);
}
in.close();
out.close();
fs.close();
}catch (IOException e) {
LOG.error("addFile Exception caught! :" , e);
new RuntimeException(e);
}
}


3 修改

重新命名hdfs中的文件名称,java代码如下:

    
public void renameFile (String fromthis, String tothis){
Configurationconf = new Configuration();

FileSystemfs;
try {
fs= FileSystem.get(conf);
PathfromPath = new Path(fromthis);
PathtoPath = new Path(tothis);

if (!(fs.exists(fromPath))) {
LOG.info("No such destination " + fromPath);
return;
}

if (fs.exists(toPath)) {
LOG.info("Already exists! " + toPath);
return;
}

try{
boolean isRenamed = fs.rename(fromPath,toPath);     //renames file name indeed.
if(isRenamed){
LOG.info("Renamed from " + fromthis + " to " + tothis);
}
}catch(Exception e){
LOG.error("renameFile Exception caught! :" , e);
new RuntimeException(e);
}finally{
fs.close();
}
}catch (IOException e1) {
LOG.error("fs Exception caught! :" , e1);
new RuntimeException(e1);
}
}


 

4 删除

在hdfs上,删除指定的一个文件。Java代码:

public void deleteFile(String file)  {
Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);

Pathpath = new Path(file);
if (!fs.exists(path)) {
LOG.info("File " + file + " does not exists");
return;
}
/*
* recursively delete the file(s) if it is adirectory.
* If you want to mark the path that will bedeleted as
* a result of closing the FileSystem.
*  deleteOnExit(Path f)
*/
fs.delete(new Path(file), true);
fs.close();
}catch (IOException e) {
LOG.error("deleteFile Exception caught! :" , e);
new RuntimeException(e);
}

}


 

Appendix 完整代码

 
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
importorg.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class SyncDFS {

private static final Log LOG = LogFactory.getLog(SyncDFS.class);

/**
* Reads the directory name(s) and file name(s)from the specified parameter "srcPath"
* @param srcPath
*/
public void list(String srcPath) {
Configuration conf = new Configuration();
LOG.info("[Defaultfs] :" +conf.get("fs.default.name"));
// conf.set("hadoop.job.ugi","app,app"); //It is not necessary for the default user.
FileSystem fs;
try {
fs= FileSystem.get(conf);
RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath));
while (rmIterator.hasNext()) {
Path path = rmIterator.next().getPath();
if(fs.isDirectory(path)){
LOG.info("-----------DirectoryName: "+path.getName());
}
else if(fs.isFile(path)){
LOG.info("-----------FileName: "+path.getName());
}
}
}catch (IOException e) {
LOG.error("list fileSysetm object stream.:" , e);
new RuntimeException(e);
}
}

/**
* Makes the specified directory if it doesn'texist.
* @param dir
*/
public void mkdir(String dir){
Configurationconf = new Configuration();
FileSystemfs = null;
try {
fs= FileSystem.get(conf);
Pathpath = new Path(dir);
if(!fs.exists(path)){
fs.mkdirs(path);
LOG.debug("create directory '"+dir+"' successfully!");
}else{
LOG.debug("directory '"+dir+"' exits!");
}
}catch (IOException e) {
LOG.error("FileSystem get configuration with anerror");
e.printStackTrace();
}finally{
if(fs!= null){
try {
fs.close();
}catch (IOException e) {
LOG.error("close fs object stream. :" , e);
new RuntimeException(e);
}
}
}
}

/**
* Reads the file content in console.
* @param file
*/
public void readFile(String file){
Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);
Pathpath = new Path(file);
if(!fs.exists(path)){
LOG.warn("file'"+ file+"' doesn't exist!");
return ;
}
FSDataInputStreamin = fs.open(path);
Stringfilename = file.substring(file.lastIndexOf('/') + 1, file.length());
OutputStreamout = new BufferedOutputStream(new FileOutputStream(
new File(filename)));

byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = in.read(b)) > 0) {
out.write(b,0, numBytes);
}
in.close();
out.close();
fs.close();
}catch (IOException e) {
LOG.error("ifExists fs Exception caught! :" , e);
new RuntimeException(e);
}
}

public boolean ifExists(String source){
if(source == null || source.length() ==0){
return false;
}
Configurationconf = new Configuration();
FileSystemfs = null;
try {
fs= FileSystem.get(conf);
LOG.debug("judge file '"+source + "'");
return fs.exists(new Path(source));
}catch (IOException e) {
LOG.error("ifExists fs Exception caught! :" , e);
new RuntimeException(e);
return false;
}finally{
if(fs != null){
try {
fs.close();
}catch (IOException e) {
LOG.error("fs.close Exception caught! :" , e);
new RuntimeException(e);
}
}

}

}

/**
* Recursively copies the source pathdirectories or files to the destination path of DFS.
* It is the same functionality as thefollowing comand:
* hadoopfs -copyFromLocal <local fs><hadoop fs>
* @param source
* @param dest
*/
public void copyFromLocal (String source, String dest) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); PathsrcPath = new Path(source); PathdstPath = new Path(dest); // Check if the file alreadyexists if (!(fs.exists(dstPath))) { LOG.warn("dstPathpath doesn't exist" ); LOG.error("No such destination " + dstPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length()); try{ //if the file exists in thedestination path, it will throw exception. // fs.copyFromLocalFile(srcPath,dstPath); //remove and overwrite files withthe method //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst) fs.copyFromLocalFile(false, true, srcPath, dstPath); LOG.info("File " + filename + "copied to " + dest); }catch(Exception e){ LOG.error("copyFromLocalFile exception caught!:" , e); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { LOG.error("copyFromLocal IOException objectstream. :" ,e1); new RuntimeException(e1); } }

public void renameFile (String fromthis, String tothis){
Configurationconf = new Configuration();

FileSystemfs;
try {
fs= FileSystem.get(conf);
PathfromPath = new Path(fromthis);
PathtoPath = new Path(tothis);

if (!(fs.exists(fromPath))) {
LOG.info("No such destination " + fromPath);
return;
}

if (fs.exists(toPath)) {
LOG.info("Already exists! " + toPath);
return;
}

try{
boolean isRenamed = fs.rename(fromPath,toPath); //renames file name indeed.
if(isRenamed){
LOG.info("Renamed from " + fromthis + " to " + tothis);
}
}catch(Exception e){
LOG.error("renameFile Exception caught! :" , e);
new RuntimeException(e);
}finally{
fs.close();
}
}catch (IOException e1) {
LOG.error("fs Exception caught! :" , e1);
new RuntimeException(e1);
}
}

/**
* Uploads or adds a file to HDFS
* @param source
* @param dest
*/
public void addFile(String source, String dest) {
// Conf object will readthe HDFS configuration parameters
Configurationconf = new Configuration();
FileSystemfs;
try {
fs= FileSystem.get(conf);
// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

// Create the destination pathincluding the filename.
if (dest.charAt(dest.length() - 1) != '/') {
dest= dest + "/" + filename;
}else {
dest= dest + filename;
}

// Check if the file alreadyexists
Pathpath = new Path(dest);
if (fs.exists(path)) {
LOG.error("File " + dest + " already exists");
return;
}

// Create a new file and writedata to it.
FSDataOutputStreamout = fs.create(path);
InputStreamin = new BufferedInputStream(new FileInputStream(
new File(source)));

byte[] b = new byte[1024];
int numBytes = 0;
//In this way read and write datato destination file.
while ((numBytes = in.read(b)) > 0) {
out.write(b,0, numBytes);
}
in.close();
out.close();
fs.close();
}catch (IOException e) {
LOG.error("addFile Exception caught! :" , e);
new RuntimeException(e);
}
}

/**
*Deletes the files if it is a directory.
* @param file
*/
public void deleteFile(String file) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); Pathpath = new Path(file); if (!fs.exists(path)) { LOG.info("File " + file + " does not exists"); return; } /* * recursively delete the file(s) if it is adirectory. * If you want to mark the path that will bedeleted as * a result of closing the FileSystem. * deleteOnExit(Path f) */ fs.delete(new Path(file), true); fs.close(); }catch (IOException e) { LOG.error("deleteFile Exception caught! :" , e); new RuntimeException(e); } }
/**
* Gets the information about the file modifiedtime.
* @param source
* @throws IOException
*/
public void getModificationTime(String source) throws IOException{

Configurationconf = new Configuration();

FileSystemfs = FileSystem.get(conf);
PathsrcPath = new Path(source);

// Check if the file alreadyexists
if (!(fs.exists(srcPath))) {
System.out.println("No such destination " + srcPath);
return;
}
// Get the filename out of thefile path
Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());

FileStatusfileStatus = fs.getFileStatus(srcPath);
long modificationTime =fileStatus.getModificationTime();

LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime));

}

/** * Gets the file block location info * @param source * @throws IOException */ public void getBlockLocations(String source) throws IOException{ Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); PathsrcPath = new Path(source); // Check if the file alreadyexists if (!(ifExists(source))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length()); FileStatusfileStatus = fs.getFileStatus(srcPath); BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); int blkCount = blkLocations.length; System.out.println("File :" + filename + "stored at:"); for (int i=0; i < blkCount; i++) { String[]hosts = blkLocations[i].getHosts(); LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts)); } }

public void getHostnames () throws IOException{
Configurationconfig = new Configuration();

FileSystemfs = FileSystem.get(config);
DistributedFileSystemhdfs = (DistributedFileSystem) fs;
DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats();

String[]names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i]= dataNodeStats[i].getHostName();
LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName()));
}
}
/**
* @param args
*/
public static void main(String[] args) {
SyncDFSdfs = new SyncDFS();
dfs.list("/user/app");

dfs.mkdir("/user/app");

// dfs.readFile("/user/app/README.txt");
LOG.info("--------------" +
dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data")); //false
LOG.info("--------------" + dfs.ifExists("/user/app/README.txt")); //true

//copied the local file(s) to thedfs.
// dfs.copyFromLocal("/opt/test","/user/app");

//delete the file(s) from the dfs
// dfs.deleteFile("/user/app/test");
//rename diretory in dfs
// dfs.renameFile("/user/app/test","/user/app/log");
//rename file in dfs
// dfs.renameFile("/user/app/log/derby.log","/user/app/log/derby_info.log");

}

}


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop