您的位置:首页 > 编程语言 > Java开发

Java 操作HDFS 简单案例 (Kerberos已开启)

2017-09-22 17:22 399 查看
直接上代码如下

package com.hdfs.demo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

/**
* @time 2017年7月14日
* @author YeChunBo 类说明: Hdfs api 的相关操作(Kerberos已开启)
*/
public class HdfsSimple {

public static Configuration getConfig(String user, String keytabPath) {

Configuration conf = new Configuration();

conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());

if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
System.setProperty("java.security.krb5.conf", "C:/Windows/krbconf/bms/krb5.ini");
// System.setProperty("java.security.krb5.conf","C:/Windows/pro/krb5.ini");
} else {
// linux系统可不设,其会自动去寻找 /etc/krb5.conf
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
}

conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation.loginUserFromKeytab(user, keytabPath);
} catch (Exception e) {
System.out.println("身份认证异常: " + e.getMessage());
e.printStackTrace();
}
return conf;
}

/**
* 获取Hdfs 指定目录下所有文件
*
* @param URI
*            hdfs远端连接url,eg:hdfs://hdp39:8020
* @param remotePath
*            hdfs远端目录路径
* @param conf
* @throws Exception
*/
public static void getHdfsFileList(String URI, String remotePath, Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(new URI(URI), conf);
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path(remotePath), true);
while (iter.hasNext()) {
LocatedFileStatus status = iter.next();
System.out.println(status.getPath().toUri().getPath());
}
fs.close();
}

// 读取文件的内容
public static void readFile(String filePath, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path srcPath = new Path(filePath);
InputStream in = null;
try {
in = fs.open(srcPath);
IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流
} finally {
IOUtils.closeStream(in);
}
}

/**
* 查看文件中的内容
* @param remoteFile
* @return
* @throws IOException
*/
public static String cat(String uri, String remoteFile, Configuration conf) throws IOException {
Path path = new Path(remoteFile);
FileSystem fs = FileSystem.get(URI.create("hdfs://hdp39:8020"), conf);
FSDataInputStream fsdis = null;
System.out.println("cat: " + remoteFile);

OutputStream baos = new ByteArrayOutputStream();
String str = null;
try {
fsdis = fs.open(path);
IOUtils.copyBytes(fsdis, baos, 4096, false);
str = baos.toString();
} finally {
IOUtils.closeStream(fsdis);
fs.close();
}
System.out.println(str);
return str;
}

// 创建目录
public static void mkdir(String URI, String path, Configuration conf) throws IOException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI(URI), conf);
Path srcPath = new Path(path);
boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("create dir ok!");
} else {
System.out.println("create dir failure");
}
fs.close();
}

/**
* 获取hdfs目录相关信息
* @param URI
* @param remotePath
* @param conf
*/
public static void getFileInfo(String URI, String remotePath, Configuration conf){

try {
FileSystem fs = FileSystem.get(new URI(URI), conf);
Path filenamePath = new Path(remotePath);
//会根据集群副本数的配置输出,笔者设置的副本数为3,因而这个值将是下面那个值的三倍
System.out.println("SIZE OF THE HDFS DIRECTORY : " + fs.getContentSummary(filenamePath).getSpaceConsumed());  // 单位是b

System.out.println("SIZE OF THE HDFS DIRECTORY : " + fs.getContentSummary(filenamePath).getLength());// 一份数据实际占的空间

FileStatus stat = fs.getFileStatus(new Path(remotePath));
System.out.print(stat.getAccessTime()+" "+stat.getBlockSize()+" "+stat.getGroup()
+" "+stat.getLen()+" "+stat.getModificationTime()+" "+stat.getOwner()
+" "+stat.getReplication()+" "+stat.getPermission());
// /user/hive/hive/jobs/hive-job-100-2017-03-27_12-59/query.hql, 1490590754315 134217728 hdfs 59 1490590754343 hive 3 rw-r--r--
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {

String user = "project2/hdp39@BMSOFT.COM"; // ok, project2 并没有设置任何权限,但是其却也有访问权限
String keytabPath = "./conf/bms/project2.keytab";

String URI = "hdfs://hdp39:8020";
String remotePath = "/tmp/test.jar";//20438517
//      String remotePath = "/apps/hbase/data/data/default/t_hbase100";

Configuration conf = getConfig(user, keytabPath);

// 注:用户所拥有的权限需要先在ambari上进行设置,然后才在Ranger中进行设置
//      cat(URI, "/user/hive/hive/jobs/hive-job-100-2017-03-27_12-59/query.hql", conf);// 读这个文件用户无须任何权限都可以读
// mkdir(URI, "/user/test",conf); // 创建目录的话,如果没有配置对应的权限则无法进行创建,并会报权限出错误的异常
// getHdfsFileList(URI, remotePath, conf);// 获取目录时,如果原目录本身就开放权限了读写权限,则无需通过Ranger进行权限控制则可以获取,如果该用户并没有对外开通相应的权限则需要通过Ranger设置相关权限,设置完之后则会根据设置的权限进行文档操作了

getFileInfo(URI, remotePath, conf);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: