您的位置:首页 > 大数据 > Hadoop

hdfs常用API和putMerge功能实现

2015-11-07 20:10 1411 查看
所需jar包



一、URL API操作方式
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSUrlTest {

/**
* HDFS URL API操作方式
* 不需要读取core-site.xml和hdfs-site.xml配置文件
*/

// 让JAVA程序识别HDFS的URL
static {

URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}

// 查看文件内容

@Test
public void testRead() throws Exception {

InputStream in = null;
// 文件路径
String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data";
try {
// 获取文件输入流
in = new URL(fileUrl).openStream();
// 将文件内容读取出来,打印控制台
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {

IOUtils.closeStream(in);
}

}

}


二、通过FileSystem API操作HDFS

HDFS工具类

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

public class HDFSUtils {

/**
*     HDFS工具类
*/

public static FileSystem  getFileSystem() {
//声明FileSystem
FileSystem hdfs=null;
try {

//获取文件配置信息
Configuration conf =new Configuration();

//获取文件系统
hdfs=FileSystem.get(conf);
} catch (IOException e) {

e.printStackTrace();
}
return hdfs;

}

}


常用操作实现类

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gethistory_jsp;
import org.junit.Test;

public class HDFSFsTest {

/**
*
* 通过FileSystem API操作HDFS
*/
// 读取文件内容

@Test
public void testRead() throws Exception {
// 获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();
// 文件名称
Path path = new Path("/opt/data/test/touch.data");
// 打开文件输入流
FSDataInputStream inStream = hdfs.open(path);
// 读取文件到控制台显示
IOUtils.copyBytes(inStream, System.out, 4096, false);

// 关闭流
IOUtils.closeStream(inStream);
}

// 查看目录
@Test
public void testList() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 文件名称
Path path = new Path("/opt/data");
FileStatus[] fileStatus = hdfs.listStatus(path);
for (FileStatus file : fileStatus) {
Path p = file.getPath();
String info = file.isDir() ? "目录" : "文件";
System.out.println(info + ":" + p);
}
}

// 创建目录
@Test
public void testDirectory() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 要创建的目录
Path path = new Path("/opt/data/dir");
boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p
// /opt/data/dir
String info = isSuccessful ? "成功" : "失败";
System.out.println("创建目录【" + path + "】" + info);
}

// 上传文件-- put copyFromLocal

@Test
public void testPut() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 本地文件(目录+文件名称)
Path srcPath = new Path("c:/0125.log");
// hdfs文件上传路径
Path dstPath = new Path("/opt/data/dir/");
hdfs.copyFromLocalFile(srcPath, dstPath);

}

// 创建hdfs文件并写入内容

@Test
public void testCreate() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

Path path = new Path("/opt/data/dir/touch.data");
// 创建文件并获取输出流
FSDataOutputStream fSDataOutputStream = hdfs.create(path);
// 通过输出流写入数据
fSDataOutputStream.write("你好".getBytes());
fSDataOutputStream.writeUTF("hello hadoop!");
IOUtils.closeStream(fSDataOutputStream);
}

// 文件重命名

@Test
public void testRename() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

Path oldPath = new Path("/opt/data/dir/touch.data");
Path newPath = new Path("/opt/data/dir/rename.data");

boolean flag = hdfs.rename(oldPath, newPath);
System.out.println(flag);
}

// 删除文件
public void testDelete() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

Path path = new Path("/opt/data/dir/touch.data");

boolean flag = hdfs.deleteOnExit(path);

System.out.println(flag);

}

// 删除目录

public void testDeleteDir() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

Path path = new Path("/opt/data/dir");

boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true

System.out.println(flag);

}

// 查找某个文件在hdfs集群的位置

public void testLocation() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

Path path = new Path("/opt/data/test.file");

FileStatus fileStatus = hdfs.getFileStatus(path);
BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,
0, fileStatus.getLen());
for (BlockLocation blockLocation : blockLocations) {

String[] hosts = blockLocation.getHosts();
for (String host : hosts) {

System.out.print(host + " ");
}
System.out.println();

}

}

// 获取hdfs集群上所有节点名称信息

public void testCluster() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();

DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;
DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();

for (DatanodeInfo datanodeInfo : datanodeInfos) {
String hostName = datanodeInfo.getHostName();

System.out.println(hostName);
}
}

}


三、上传合并小文件到hdfs
实现思想:循环遍历本地文件输入流

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
*
* 向hdfs上传复制文件的过程中,进行合并文件
*
*/
public class PutMerge {

/**
*
* @param localDir
*            本地要上传的文件目录
* @param hdfsFile
*            HDFS上的文件名称,包括路径
*/
public static void put(String localDir, String hdfsFile) throws Exception {

// 获取配置信息
Configuration conf = new Configuration();
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsFile);

// 获取本地文件系统
FileSystem localFs = FileSystem.getLocal(conf);
// 获取HDFS
FileSystem hdfs = FileSystem.get(conf);

// 本地文件系统指定目录中的所有文件

FileStatus[] status = localFs.listStatus(localPath);
// 打开hdfs上文件的输出流
FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath);
// 循环遍历本地文件

for (FileStatus fileStatus : status) {
// 获取文件
Path path = fileStatus.getPath();
System.out.println("文件为:" + path.getName());
// 打开文件输入流
FSDataInputStream fSDataInputStream = localFs.open(path);

// 进行流的读写操作
byte[] buff = new byte[1024];

int len = 0;
while ((len = fSDataInputStream.read(buff)) > 0) {

fSDataOutputStream.write(buff, 0, len);
}
fSDataInputStream.close();
}
fSDataOutputStream.close();
}

public static void main(String[] args) {
String localDir="D:/logs";
String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data";
try {
put(localDir,hdfsFile);
} catch (Exception e) {

e.printStackTrace();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  FileSystem hdfs API