您的位置:首页 > 其它

Hbase与Mapreduce集成的案例

2017-12-07 19:13 323 查看
【需求】将info列簇中的name这一列导入到另外一张表中去


建表:

create 'test:stu_info','info','degree','work'

插入数据:6个rowkey 3个列簇

put 'test:stu_info','20170222_10001','degree:xueli','benke'

put 'test:stu_info','20170222_10001','info:age','18'

put 'test:stu_info','20170222_10001','info:sex','male'

put 'test:stu_info','20170222_10001','info:name','tom'

put 'test:stu_info','20170222_10001','work:job','bigdata'

put 'test:stu_info','20170222_10002','degree:xueli','gaozhong'

put 'test:stu_info','20170222_10002','info:age','22'

put 'test:stu_info','20170222_10002','info:sex','female'

put 'test:stu_info','20170222_10002','info:name','jack'

put 'test:stu_info','20170222_10003','info:age','22'

put 'test:stu_info','20170222_10003','info:name','leo'

put 'test:stu_info','20170222_10004','info:age','18'

put 'test:stu_info','20170222_10004','info:name','peter'

put 'test:stu_info','20170222_10005','info:age','19'

put 'test:stu_info','20170222_10005','info:name','jim'

put 'test:stu_info','20170222_10006','info:age','20'

put 'test:stu_info','20170222_10006','info:name','zhangsan'

create 't5' , {NAME=>'info'}

一个region就是一个maptask任务

在hadoop中的hadoop-env.sh文件中添加相关的jar,进行集成依赖

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/moduels/hbase-0.98.6-hadoop2/lib/*




JAVA代码如下:

package com.bigdata.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestDriver2 extends Configured implements Tool{

public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job=Job.getInstance(conf,"mr-hbase2");
job.setJarByClass(TestDriver2.class); // class that contains mapper and reducer
Scan scan = new Scan();
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
"test:stu_info", // input table
scan, // Scan instance to control CF and attribute selection
TestHbaseMap.class, // mapper class
ImmutableBytesWritable.class, // mapper output key
Put.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"test:info_name", // output table
null, // reducer class
job);
job.setNumReduceTasks(1);
return job.waitForCompletion(true)? 0:1;
}

public static void main(String[] args) {

Configuration conf=HBaseConfiguration.create();
try {
int status=ToolRunner.run(conf, new TestDriver2(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}

}
package com.bigdata.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

public class TestHbaseMap extends TableMapper<ImmutableBytesWritable, Put>{

@Override
protected void map(ImmutableBytesWritable key, Result value,Context context)
throws IOException, InterruptedException {
Put put=new Put(key.get());
for(Cell cell:value.rawCells()){
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//匹配info列簇的数据
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//匹配name列这列的数据
put.add(cell);
}
}
}

context.write(key, put);
}
}


打成jar包 mr-hbase.jar上传linux
hbase目录下执行如下代码

/opt/moduels/hadoop-2.5.0/bin/yarn jar /opt/datas/mr-hbase.jar

 20170222_10001                 column=info:name, timestamp=1497059738675, value=tom                                      

 20170222_10002                 column=info:name, timestamp=1497059738956, value=jack                                     

 20170222_10003                 column=info:name, timestamp=1497059739013, value=leo                                      

 20170222_10004                 column=info:name, timestamp=1497059739121, value=peter                                    

 20170222_10005                 column=info:name, timestamp=1497059739254, value=jim                                      

 20170222_10006                 column=info:name, timestamp=1497059740585, value=zhangsan 








 importtsv格式化导入

Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

-》选项:-D表示指明某一个参数,key=value

-》将文件上传到HDFS

/opt/moduels/hadoop-2.5.0/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:sex stu_info /test.tsv

-》如果不是默认的\t,就要在语句中指定输入的分隔符

/opt/moduels/hadoop-2.5.0/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar importtsv -Dimporttsv.separator=, -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:sex stu_info /test2.csv

第一步:转换Hfile  ->其实就是storefile

/opt/moduels/hadoop-2.5.0/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:sex    -Dimporttsv.bulk.output=/testHfile stu_info /test3.tsv

第二步:导入hbase 这一步不是 mapreduce程序 把storefile文件移动到 hbase对应表的目录下

官网事例:/opt/moduels/hadoop-2.5.0/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar completebulkload

usage: completebulkload /path/to/hfileoutputformat-output tablename

completebulkload

/opt/moduels/hadoop-2.5.0/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar completebulkload /testHfile stu_info

注:利用Sqoop可以实现将数据从关系型数据库导入到Hbase中






内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: