您的位置:首页 > 编程语言 > Java开发

java调用API操作HDFS

2015-01-31 15:38 507 查看
java调用API操作HDFS

本文介绍Java调用API从hdfs读取数据

package mongodb;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.util.ReflectionUtils;

class Item implements Comparable
{
String value;
double weight;
public Item(String v)
{
value=v;
weight = Double.parseDouble(value.split(":")[1]);
}
public int compareTo(Object o)
{
return this.weight == ((Item) o).weight ? 0 :
(this.weight  > ((Item) o).weight ? -1 : 1);
}
}

public class BatchUpdateSim {
public static String parse(String str)
{
//String str="90003	20718001:1.0,2077635:1.0,2053809:1.0";
String[] fields=str.split("\t");
String[] valueArray= fields[1].split(",");
Item[] items = new Item[valueArray.length];
for(int i=0;i<items.length;i++)
{
items[i]=new Item(valueArray[i]);
}
Arrays.sort(items);
for(int i=0;i<valueArray.length;i++)
{
valueArray[i] = "{"+items[i].value+"}";
}
String valueStr = StringUtils.join(valueArray,",");
return  "{\"key\":"+fields[0]+",\"values\":["+valueStr+"]}";

}
public static void main(String[] args) throws IOException, ClassNotFoundException {
// TODO Auto-generated method stub

Configuration  conf = new Configuration ();
conf.addResource("/usr/local/hadoop/conf/core-site.xml");
conf.addResource("/usr/local/hadoop/conf/hdfs-site.xml");
conf.addResource("/usr/local/hadoop/conf/mapred-site.xml");
//String HDFS="hdfs://webdm-cluster";
//String HDFS="hdfs://localhost:9000";
String HDFS=args[0];
FileSystem hdfs = FileSystem.get(URI.create(HDFS),conf);
String filePath=args[1];
FSDataInputStream fin = hdfs.open(new Path(filePath));

//CompressionCodecFactory factory = new CompressionCodecFactory(conf);
//CompressionCodec codec = factory.getCodec(new Path(filePath)); //根据hdfs文件的后缀类型自动识别
//Class<?> codecClass = Class.forName("com.hadoop.compression.lzo.LzoCodec");
//CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
BufferedReader reader = null;
String line;
int count=0;
RecsysDb db = RecsysDb.getInstance();
String itemSimName=args[2];
try
{
// if (codec == null)
// {
reader = new BufferedReader(new InputStreamReader(fin)); // in = new BufferedReader(new InputStreamReader(fin, "UTF-8"));
// }
/*
else
{
System.out.println("识别出压缩类型");
CompressionInputStream comInputStream = codec.createInputStream(fin);
reader = new BufferedReader(new InputStreamReader(comInputStream));
} */

while ((line = reader.readLine()) != null)
{
//if(count==0) System.out.println(line);
String strJson=parse(line);

db.updateItemSim(itemSimName, strJson);
count++;
if(count%1000==0)System.out.println("count:"+count);
}
}
finally
{
if (reader != null)
{
reader.close();
}
System.out.println(count);

}

}

}


下面是一个shell脚本,用Java命令执行上面的程序的话,需要加载各种Hadoop相关的jar包。
(PS:后来发现有种更好的方法,就是用Hadoop命令执行,因为用Hadoop命令执行,Hadoop自己会加载一些jar包,无需自己再手动加载)
hd_core="/usr/local/hadoop/hadoop-core-1.1.2.jar";
s4j="/usr/local/hadoop/lib/slf4j-log4j12-1.6.1.jar";
s4japi="/usr/local/hadoop/lib/slf4j-api-1.6.1.jar";
log4j="/usr/local/hadoop/lib/log4j-1.2.17.jar";
guva="/usr/local/hadoop/lib/guava-11.0.2.jar";
clog="/usr/local/hadoop/lib/commons-logging-1.1.1.jar";
cconf="/usr/local/hadoop/lib/commons-configuration-1.6.jar";
cl="/usr/local/hadoop/lib/commons-lang-2.5.jar";
ccli="/usr/local/hadoop/lib/commons-cli-1.2.jar";
protobuf="/usr/local/hadoop/lib/protobuf-java-2.4.0a.jar";
hdfs="/usr/local/hadoop/lib/hadoop-hdfs-1.1.2.jar";
mongodb="/data/home/liulian/linger/jars/mongo-java-driver-2.12.4.jar";
libs=(
$hd_core
$s4j
$s4japi
$log4j
$guva
$clog
$cconf
$cl
$ccli
$protobuf
$hdfs
$mongodb
)
libstr=""
for jarlib in ${libs[@]};
do
libstr=${jarlib}":"${libstr}
done
echo $libstr;
java -Xbootclasspath/a:${libstr}  -jar ../jars/updateSim.jar hdfs://10.200.91.164:9000 tv_sim/result/000000_0 tvItemsimColl


本文链接:http://blog.csdn.net/lingerlanlan/article/details/42178675
本文作者:linger
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java hdfs jar包 Hadoop