您的位置:首页 > 其它

本人写的如何使用DFS API 合并为一个大的天气数据文件

2014-01-23 11:00 459 查看
本人写的如何使用DFS API 合并为一个大的天气数据文件:

package com.zk.hadoop;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class AirTemperatureDataTranslate {

public static byte[] buffer = new byte[4096];

public AirTemperatureDataTranslate() {
// TODO Auto-generated constructor stub
}

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub

String localPath = args[0];
String dfsPath = args[1];

File localDir = new File(localPath);

FileInputStream input = null;
FSDataOutputStream fsOutput = null;

Configuration conf = new Configuration();
try {
FileSystem fs = FileSystem.get(new URI(dfsPath), conf);
fsOutput = fs.create(new Path(dfsPath));
if(localDir.isDirectory()){
System.out.println("处理目录:" + localDir.getName());
File[] fileArr = localDir.listFiles();
for(File tmp : fileArr){
oper(tmp, input, fsOutput, conf);
}
}else{
System.out.println("处理文件:" + localDir.getName());
input = new FileInputStream(localDir);
write(input, fsOutput);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(fsOutput != null) IOUtils.closeStream(fsOutput);
if(input != null) IOUtils.closeStream(input);
}

}

/**
* 递归扫描处理文件
* @param file
* @param input
* @param output
* @param conf
* @throws IOException
*/
public static void oper(File file, FileInputStream input, FSDataOutputStream output, Configuration conf) throws IOException{
if(file.isDirectory()){
File[] fileArr = file.listFiles();
for(File tmp : fileArr){
oper(tmp, input, output, conf);
}
}else{
System.out.println("处理文件:" + file.getName());
input = new FileInputStream(file);
write(input, output);
}
}

/**
* 写入DFS 输出流
* @param input
* @param output
* @throws IOException
*/
public static void write(FileInputStream input, FSDataOutputStream output) throws IOException{
int readSize = 0;
while((readSize = input.read(buffer)) > 0){
output.write(buffer, 0, readSize);
}
}

}

说明:运行指定天气数据本地目录和合并后的HDFS上的文件名称即可。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐