您的位置:首页 > 编程语言 > Java开发

java hadoop hdfs 上写文件

2017-02-06 11:20 387 查看
项目中会用到往hdfs 上写文件  ,为下面kafka 往hdfs 上写文件做基础。

实例如下:

1、配置文件:com/xiefg/config/system.properties   

#以下是安装 hadoop 配置文件的路径

core.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/core-site.xml
hdfs.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/hdfs-site.xml
yarn.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/yarn-site.xml
mapred.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/mapred-site.xml

2、读取配置文件的工具类:
package com.xiefg.util;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.PropertyResourceBundle;
import java.util.ResourceBundle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

/***
*
* @ClassName: PropertiesUtils
* @Description: TODO
* @author Comsys-xiefg
* @date 2017年2月5日 下午2:41:52
*
*/
public class PropertiesUtils {

private static ResourceBundle resources=null;

public static String HDFS_PATH = null;
public static String YARN_PATH = null;
public static String CORE_PATH = null;
public static String MAPRED_PATH = null;

static{
InputStream in;
try {
/*String config_path = System.getProperty("user.dir") + "/config/system.properties";
in = new BufferedInputStream(new FileInputStream(config_path));*/

in=Thread.currentThread().getContextClassLoader().getResourceAsStream(
"com/xiefg/config/system.properties");
resources = new PropertyResourceBundle(in);

//初始化hadoop配置
initHadoopConfig();

} catch (Exception e) {
e.printStackTrace();
}

}

/**
*
* @Title: initHadoopConfig
* @Description: 初始化hadoop 配置
* @return void 返回类型
* @throws
*/
public static void initHadoopConfig(){
HDFS_PATH = resources.getString("hdfs.path");
YARN_PATH = resources.getString("yarn.path");
CORE_PATH = resources.getString("core.path");
MAPRED_PATH = resources.getString("mapred.path");

}

public static Configuration getHDFSConf() {
Configuration conf = new Configuration();
conf.addResource(new Path(HDFS_PATH));
conf.addResource(new Path(CORE_PATH));
conf.addResource(new Path(MAPRED_PATH));
conf.addResource(new Path(YARN_PATH));
return conf;
}

/**
* 获取指定属性值
* @param property 属性名
* @return
*/
public static String getPropertiesValue(String property) {
String val = "";
try {
val = resources.getString(property);
} catch (Exception e) {
// ignore
e.printStackTrace();
}
return val;
}

public static void main(String[] args) {
Configuration conf = PropertiesUtils.getHDFSConf();
try {
HdfsFileUtil.appendFile(conf, "/test/kafka", "kafka test to hdfs xiefg".getBytes());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

//System.out.println(PropertiesUtils.getPropertiesValue(KafkaProperties.ZK));
}

}


3、hdfs 文件工具类
/**
*
*/
package com.xiefg.util;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
*
* @ClassName: HdfsFileUtil
* @Description: TODO
* @author Comsys-xiefg
* @date 2017年2月6日 上午10:23:13
*
*/
public class HdfsFileUtil {
/***
*
* @Title: deleteHfile
* @Description: 删除hdfs指定目录的文件
* @param conf
* @param ioPath
* @throws IOException 设定文件
* @return void 返回类型
* @throws
*/
public static void deleteHfile(Configuration conf, String ioPath)
throws IOException {

FileSystem fileSystem=null;
try{
fileSystem = FileSystem.get(conf);
fileSystem.delete(new Path(ioPath), true);
}catch(Exception e){
e.printStackTrace();
}finally{
try {fileSystem.close();} catch (IOException e) {}
}
}

/**
*
* @Title: createFile
* @Description:写文件
* @param conf
* @param file
* @param content
* @throws IOException 设定文件
* @return void 返回类型
* @throws
*/

public static void createFile(Configuration conf,String file, String content) throws IOException {
FileSystem fs = FileSystem.get(conf);
byte[] buff = content.getBytes();
FSDataOutputStream os = null;
try {
os = fs.create(new Path(file));
os.write(buff, 0, buff.length);
os.flush();
System.out.println("Create: " + file);
} finally {
if (os != null)
os.close();
}
fs.close();
}
/**
*
* @Title: appendFile
* @Description: 创建文件并追加内容
* @param conf
* @param file
* @param buff
* @throws IOException 设定文件
* @return void 返回类型
* @throws
*/
public static void appendFile(Configuration conf,String file,byte[] buff) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path path=new Path(file);
if (!fs.exists(path)) {
createFile(conf,file,new String(buff,"UTF-8"));
}else{
FSDataOutputStream os = null;
try {
os = fs.append(path);
os.write(buff, 0, buff.length);
os.flush();
//System.out.println("Create: " + file);
} finally {
if (os != null)
os.close();
fs.close();
}
}
}
/**
*
* @Title: isExist
* @Description: 判断文件是否存在
* @param path
* @param conf
* @return
* @throws IOException 设定文件
* @return boolean 返回类型
* @throws
*/

public static boolean isExist(String path,Configuration conf) throws IOException {
FileSystem fs=null;
Boolean isexists = null;
try {
Path p = new Path(path);
fs = p.getFileSystem(conf);
isexists=fs.exists(p);
}catch (Exception e) {
e.printStackTrace();
}finally{
fs.close();
}

return isexists;
}

public static void main(String[] args) throws Exception {

}
}


通过打jar 包  放在hadoop平台上 运行 命令:
hadoop jar /usr/etl.jar com.xiefg.util.PropertiesUtils

执行结果可以看到   hdfs 生成文件目录 和内容  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: