您的位置:首页 > 其它

使用MapReduce处理Hbase数据

2013-01-10 23:22 387 查看
  今天终于把MR处理Hbase的数据的程序搞定了,自己走了好多的弯路,程序写完之后,在本机的伪分布式的hadoop上跑是没问题的,可是把程序上传的集群上就出错了,最后发现是zookeeper没配对,在编译的时候没有把conf添加的CLASSPATH,这才导致出错的。

  下面是MR测试的程序:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class Test {
private static final String sourceTable = "sourceTable";
private static final String targetTable = "targetTable";
static Configuration config = HBaseConfiguration.create();

public static void createTable(String tablename, String[] cfs) throws IOException {
HBaseAdmin admin = new HBaseAdmin(config);
if (admin.tableExists(tablename)) {
System.out.println("table already exists");
}
else {
HTableDescriptor tableDesc = new HTableDescriptor(tablename);
for (int i = 0; i < cfs.length; i++) {
tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
}
admin.createTable(tableDesc);
System.out.println("create table successly");
}
}
/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
String[] cfs = {"a"};
createTable(targetTable, cfs);
Job job = new Job(config, "test");
job.setJarByClass(Test.class);
Scan scan = new Scan();
scan.setCaching(1024);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
sourceTable,
scan,
Mapper1.class,
Text.class,
IntWritable.class,
job);
TableMapReduceUtil.initTableReducerJob(
targetTable,
Reducer1.class,
job);
boolean b = job.waitForCompletion(true);
if(!b){
throw new IOException("error");
}
}

public static class Mapper1 extends
TableMapper<Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException{
String id = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a")));
text.set(id);
context.write(id, ONE);
}
}
public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int i = 0;
for (IntWritable val : values){
i += val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("a"), Bytes.toBytes("c"), Bytes.toBytes(i));
context.write(null, put);
}
}
}


编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH

export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/

在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,

然后创建 test_classes文件夹,

运行命令:

javac -d test_classes/ Test.java

运行完成后会在test_classes文件夹下生成3个.class文件

然后运行

jar -cvf test.jar -C test_classes .

即可生成test.jar 文件

最后运行:

bin/hadoop jar test.jar Test

运行MR程序即可
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: