您的位置:首页 > 编程语言 > Java开发

Hadoop FIleSystem API JAVA操作。

2017-07-13 00:04 507 查看

1.FileSystem的获取。

我们可以在Hadoop中使用FileSystem API来打开一个文件的输入流,然后我们可以对文件进行各种的操作实现。

FileSystem是一个通用的文件系统API,FileSystem的获取方法有以下几种(以下所有相关代码接口实现均为手打,不保证完全正确,如有出错敬请谅解。):

public statis FileSystem get(Configuration conf) throws IOException
public statis FileSystem get(URI uri , Configuration conf) throws IOException
public statis FileSystem get(URI uri , Configuration conf ,String user) throws IOException


共有三种参数不同的静态工厂方法,所以想要得到一个FileSystem对象的话,是new不出来的。

需要注意的是这里的Configuration 对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现,这里的Configuration 不要导错包了。

import org.apache.hadoop.conf.Configuration;

第一个方法返回的是默认文件系统(在core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统)。

这里需要解释一点就是,在core-site.xml中的fs.default.name(2.x版本中使用的是fs.fs.defaultFS)属性就是指定了HDFS的namenode和默认文件系统,也是因为这个属性的原因,所以当用户在运行HDFS的时候,HDFS必须是服务器配置的默认文件系统。单为了操作方便,也允许在客户端配置中将其他文件系统指定为默认文件系统。

第二个方法通过给定的URI方法和权限来确定要使用的文件系统,如果给定的URI中没有指定方案,则返回默认的文件系统。

第三个方法添加了user参数,顾名思义就是让给定用户来访问文件系统,增加一定的安全性。

除了这三种之外还有一种方法可以获取本地文件系统。

public static LocalFileSystem getLocal(Configuration conf) throws IOException


2.FileSystem读取数据。

一旦获取到FileSystem实例之后,就可以对文件进行各种操作。

1.可以调用open()函数来获取文件的输入流:

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException


需要注意的是这里Path是在 org.apache.hadoop.fs.Path 包下的。

既然能够获取到文件的输入流,那么我们很容易就可以将文件的内容显示出来,在 org.apache.hadoop.io报下提供了一个IOUtils类,我们可以直接使用该类的copyBytes方法将输入流中的内容输出到System.out中,这样就能够展现出读取文件的作用,就能够直接将HDFS中的文件通过流的形式进行提取和存储。

调用方法如下:第一个参数就是输入流,第二个是输出流,第三个是字节数,第四个是是否关闭数据流(可以在这里设置为false,但是那么就需要在后面手动关闭)。

in = fs.open(new Path(uri));//根据HDFS的uri获取一个数据流
IOUtils.copyBytes(in, System.out, 4096 ,false);//进行数据流之间的拷贝
IOUtils.closeStream(in);//关闭数据流


2.可以看到通过open对象会返回一个FSDataInputStream对象,这个对象并不是标准的java.io类对象。这个类是继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问,可以从流的任意位置读取数据。

package org.apache.hadoop.fs;
public class FSdataInputStream extends DataInputStream implements Seekable,PositionedReadable{}


Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量getPos()的方法。

public interface Seekable {
void seek(long pos) throws IOException
long getPos() throws IOException
boolean seekToNewSource(long targetPos) throws IOException//这个方法在书中有提到,但是本人在API接口中没有找到
}


调用getPos()方法会返回对于文件开头的当前偏移量。

调用seek()方法的时候,如果定位大于文件长度的位置就会引发IOException异常。

seek()和java.io.InputStream中的skip()有些不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

所以我们可以使用seek()方法来将一个数据流读取两次,先使用上述中的copyBytes方法读取一次流数据,然后调用seek方法定位到流的开始位置,再次读取。

in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096 ,false);//进行数据流之间的拷贝
in.seek(0);//回到数据流开始的位置
IOUtils.copyBytes(in, System.out, 4096 ,false);//再次读取


除了Seekable接口之外,还实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分。

public interface PositionedReadable {
public int read(long position, byte[] buffer,int offset, int length) throws IOException
public void readFully(long position,byte[] buffer,int offset,int length) throws IOException
public void readFully(long position,byte[] buffer) throws IOException


read方法从文件的指定position出读取至少为length字节的数据并存入缓冲区buffer的指定偏移量offset处,返回值是实际读到的字节数:所以我们需要检查一下这个返回值,有可能小于指定的length长度。

readFully()方法将指定length长度的字节数数据读取到buffer中,除非已经读到文件末尾,这种情况下将抛出EOFException异常。

这三种方法都是线程安全的,其中两种readFully具体差异和使用方法可以查看官方API解释

3.FileSystem写入数据。

1.创建一个新的文件。

既然想写入数据,那么我们有必要学会如何创建一个文件,可以使用create方法进行创建。

public FSDataOutputStream create(Path f) throws IOException

这个方法有超级多个重载版本,可以查看官方API,可以指定是否需要强制覆盖现有的文件,文件的备份数量,写入文件时所用的缓冲区大小,文件块大小以及文件权限。需要注意的是,如果指定的文件父目录不存在的时候会自动创建该文件的父目录。如果文件存在则会抛出异常。

另一种创建文件的方法是 createNewFile(Path f)创建一个空的文件:

public boolean createNewFile(Path f) throws IOException


2.文件的追加。

除了以上的两种方法之外,在有些书中还可以看到使用append方法进行创建文件的,但该方法顾名思义主要是用来对文件进行追加操作的。最开始的时候HDFS中的文件是不支持修改的,自然也就不存在追加一说了,但据说是1.0.4版本(也有人说是2.0版本)之后支持文件的追加操作,想要追加内容需要在core-site.xml中配置如下(现在貌似默认支持的):

<property>
<name>dfs.support.append</name>
<value>true</value>
</property>


本来个人认为是使用append方法,如果文件不存在的话会创建文件,但是本人在测试的时候如果文件不存在会出现以下错误提示:

java.io.FileNotFoundException: failed to append to non-existent file

所以append用来创建文件的说法可能不是真的,如果用来追加内容可以使用以下方法:

String content = "append Content";
InputStream in = new ByteArrayInputStream(content.getBytes());
FSDataOutputStream out = fs.append(path);
IOUtils.copyBytes(in, out, 4096, true);


3.可以从create()方法看出会有一个FSDataOutputStream的对象返回,与上述所述中的FSDataInputStream类似,它也有一个查询文件当前位置的方法:

package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException{}
}


与FSDataInputStream类不同的是,FSDataOutputStream类不允许中定位。因为HDFS值允许对一个已打开的文件顺序写入,或者在现有的末尾追加数据。通俗的说,HDFS不支持除文件末尾之外的其他位置进行写入。

除了getPos()之外还有几个方法,close()方法就不提了,其他的摘自官方API:

public void hflush()throws IOException

//Flush out the data in client’s user buffer. After the return of this call, new readers will see the data.

public void hsync() throws IOException

//Similar to posix fsync, flush out the data in client’s user buffer all the way to the disk device (but the disk may have it in its cache).

public void setDropBehind(Boolean dropBehind)throws IOException

//Configure whether the stream should drop the cache.

4.FileSystem的其他操作。

1.创建目录:

我们可以是使用FileSystem的实例来创建一个目录。

public boolean mkdirs(Path f) throws IOException


调用这个方法之后可以一次性新建所有必要但还是没有的父目录,如果所有的目录都创建成功返回true。

但这个方法有点鸡肋,因为上述中的创建文件调用的create()方法就可以自动创建父目录的。但不排除一定条件下的使用。

2.删除文件。

可以使用FileSystem的delete()方法可以永久性删除文件或目录,如果是一个目录则会删除该目录下的所有内容。

public boolean delete(Path f) throws IOException
public abstract boolean delete(Path f, boolean recursive)throws IOException


上面的两个方法中,第一个方法已经弃用,官方也推荐使用第二个方法。

第二个方法中的第一个参数就是所要删除文件或目录的路径,第二个参数是是否递归删除一个目录。

如果f是一个文件或者空目录,那么recursive的值就会被忽略。但如果是非空目录的话,只有recrusive为true的时候,该目录及目录下的内容才会被删除,否则会抛出一个IOException异常。

3.获取文件元数据:FileStatus。

FileStatus类中封装了文件系统中文件和目录的元数据,包括文件长度、块大小,复本,修改时间,所有者以及权限信息。这些信息对于一个文件来说十分重要,我们可以使用FileSystem中的getFileStatus()获取文件或者目录的FileStatus对象。

public abstract FileStatus getFileStatus(Path f) throws IOException


除这个之外还可以使用listStatus()方法来返回一个目录下的所有内容的FileStatus。

public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException


4.文件重命名。

我们可以使用rename来对文件进行重命名。其实这个函数也可以起到移动文件的作用。

public abstract boolean rename(Path src,Path dst)throws IOException


5.本地文件和HDFS的互相拷贝。

我们可以利用数据流向HDFS中写入文件,但是FileSystem提供一个比较简单的操作,可以直接将本地的文件复制到HDFS中,那就是copyFromLocalFile,这个方法也有几个重载版本,具体使用可以查看[官方文档]

(http://hadoop.apache.org/docs/stable/api/)

最简单的操作:

public void moveFromLocalFile(Path src,Path dst)throws IOException


除了从本地直接拷贝文件到HDFS之外,还有从HDFS拷贝文件到本地。

public void copyToLocalFile(Path src,Path dst) throws IOException


除了这两个还有移动文件moveFromLocalFile(),所以我们可以直接使用这些方法来完成对文件的上传和下载。

6.判断文件或目录是否存在。

这个使用起来就比较简单了。存在返回true,否则返回false。

public boolean exists(Path f)throws IOException


5.简单封装FileSystem。

除了上述我介绍到的API之外,还有许多有关FIleSystem的API,但比较常用的就是上述所介绍的一些,至于其他的可以参考官方文档

下面是本人对FileSystem操作HDFS的一些简单的封装使用,其中注释写的比较完整,可以参考一下。

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSFileSystemUtils {

// 设置HDFS的URL和端口号
private String HDFSURL;
// HDFS的配置信息
private Configuration conf;

private FileSystem fs;

public HDFSFileSystemUtils(String HDFSURL) {
this.HDFSURL = HDFSURL;
this.conf = new Configuration();
try {
// 得到FileSystem的连接
this.fs = FileSystem.get(URI.create(HDFSURL), conf);
} catch (IOException e) {
e.printStackTrace();
}
}

/*
* 打开FileSystem
*/
public void openFileSystem() {
try {
this.fs = FileSystem.get(URI.create(HDFSURL), conf);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 关闭FileSystem
*/
public void closeFileSystem() {
try {
this.fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 上传本地文件到HDFS
*
* @param localFile
*            本地文件的目录
* @param folder
*            HDFS中的目录,或者文件名
*/
public void putFile(String local, String dst) {
try {
// 从本地将文件拷贝到HDFS中,如果目标文件已存在则进行覆盖
dst = HDFSURL + dst;
fs.copyFromLocalFile(new Path(local), new Path(dst));
System.out.println("上传成功!");
// 关闭连接
} catch (IOException e) {
System.out.println("上传失败!");
e.printStackTrace();
}
}

/**
* 下载HDFS中的文件到本地
*
* @param dst
*            HDFS中文件目录
* @param local
*            本地目录
*/
public void getFile(String dst, String local) {
try {
dst = HDFSURL + dst;
if (!fs.exists(new Path(dst))) {
System.out.println("文件不存在!");
} else {
fs.copyToLocalFile(new Path(dst), new Path(local));
System.out.println("下载成功!");
}
} catch (IOException e) {
System.out.println("下载失败!");
e.printStackTrace();
}
}

/**
* 删除HDFS中的文件或者目录
*
* @param dst
*/
public void deleteFile(String dst) {
try {
dst = HDFSURL + dst;
if (!fs.exists(new Path(dst))) {
System.out.println("文件不存在!");
} else {
fs.delete(new Path(dst), true);
System.out.println("删除成功!");
}
} catch (IOException e) {
System.out.println("删除失败!");
e.printStackTrace();
}
}

/**
* 显示HDFS中的文件到输入流中,显示在console中
*
* @param dst
*/
public void catFile(String dst) {
dst = HDFSURL + dst;
FSDataInputStream in = null;
try {
if (!fs.exists(new Path(dst))) {
System.out.println("文件不存在!");
} else {
System.out.println(dst + ": ");
// 打开文件流
in = fs.open(new Path(dst));
IOUtils.copyBytes(in, System.out, 4096, false);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(in);
}
}

/**
* 显示出所传入目录下的所有文件
*
* @param dst
*/
public void listStatus(String dst) {
try {
dst = HDFSURL + dst;
if (!fs.exists(new Path(dst))) {
System.out.println("目录不存在!");
return;
}
// 得到文件的状态
FileStatus[] status = fs.listStatus(new Path(dst));
for (FileStatus s : status) {
System.out.println(s.getPath());
}

} catch (IllegalArgumentException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* 判断文件是否存在
*
* @param dst
*/
public void isExists(String dst) {
dst = HDFSURL + dst;
try {
if (fs.exists(new Path(dst))) {
System.out.println("文件存在!");
} else {
System.out.println("文件不存在!");
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}

/**
* 创建路径
*
* @param dst
*/
public void createDir(String dst) {
dst = HDFSURL + dst;
try {
if (fs.exists(new Path(dst))) {
System.out.println("目录已存在!");
} else {
fs.mkdirs(new Path(dst));
System.out.println("创建成功!");
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}

/**
* 创建文件,并且可以将制定内容写到文件中,如果为null则创建一个空的文件
*
* @param dst
* @param content
*/
public void createFile(String dst, String content) {
dst = HDFSURL + dst;
try {
if (fs.exists(new Path(dst))) {
System.out.println("文件已存在!");
} else {
Path path = new Path(dst);
// 创建一个空文件
if (content.equals("") || content == null) {
fs.createNewFile(path);
System.out.println("创建了一个空文件!");
} else {
// 创建一个文件,然后将内容写入进去
FSDataOutputStream out = fs.create(path);
out.write(content.getBytes());
out.close();
System.out.println("文件创建成功,内容已写入!");
}
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}

/**
* 将dst1重命名为dst2,也可以进行文件的移动
*
* @param dst1
* @param dst2
*/
public void moveFile(String dst1, String dst2) {
dst1 = HDFSURL + dst1;
Path path1 = new Path(dst1);
dst2 = HDFSURL + dst2;
Path path2 = new Path(dst2);

try {
if (!fs.exists(path1)) {
System.out.println(dst1 + " 文件不存在!");
return;
}
if (fs.exists(path2)) {
System.out.println(dst2 + "已存在!");
return;
}
// 将文件进行重命名,可以起到移动文件的作用
fs.rename(path1, path2);
System.out.println("文件已重命名!");
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 追加content 到dst所指的文件中
*
* @param dst
* @param content
*/
public void appendFile(String dst, String content) {
dst = HDFSURL + dst;
Path path = new Path(dst);
try {
if (!fs.exists(path)) {
System.out.println("文件不存在!");
} else {
InputStream in = new ByteArrayInputStream(content.getBytes());
FSDataOutputStream out = fs.append(path);
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("文件追加成功!");
}
} catch (IOException e) {
e.printStackTrace();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: