您的位置:首页 > 理论基础 > 数据结构算法

Hbase数据解析mapreduce过程及遇到的问题

2016-05-18 19:44 435 查看
本次介绍的是将hbase中存的数据,用mapreduce解析。

一,hbase中的数据结构:



二,分析map过程:

因为这里是对hbase中的表进行解析,所以map必须是继承TableMapper类来对数据进行解析。

注意:输入的key value必须是ImmutableBytesWritable和Result。输出的就可以自己定义了。

ImmutableBytesWritable,指代的是行健,

Result,指代的是值。

在这个map过程中会将tags,用“,”分割得到一个字符数组,然后同nikname一起写出,nakname做key,tag做value。

三,分析Reduce过程:

这里的Reduce过程跟平时的没有太大的区别,在整合数据时,也是用“,”分割的。

最后注意:

map过程的输出值,与reduce的输入值,一定要对应,并且value不能为空,否则reduce过程会进不去。开始我的就是key有,但是value空了,导致reduce过程一直不进去。

这里还提供了hbase表创建的方法testData()。

四,代码:

mport java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class YaoHbaseAndMapReduce02 {

public static class YaoMap02 extends TableMapper<Text, Text>{
@Override

//key是hbase中的行键
//value是hbase中的所行键的所有数据
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
Text v=null;
String[] kStrs=null;
List<Cell> cs=value.listCells();
for(Cell cell:cs){
if("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
kStrs=Bytes.toString(CellUtil.cloneValue(cell)).split(",");
//  System.out.println("yaomap,kStrs=="+kStrs);
}
else if("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
v=new Text(CellUtil.cloneValue(cell));
}

}
for (String kStr : kStrs) {
System.out.println("*****"+new Text(kStr.toLowerCase())+"==="+v);
context.write(new Text(kStr.toLowerCase()), v);
}
}

}

public static class YaoReduce02 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb=new StringBuilder();
for(Text text:values){
System.out.println("sb=="+sb);
System.out.println(sb.length() > 0);
sb.append((sb.length() > 0 ? ",":"") + text.toString());

}
Text va=new Text(sb.toString());

context.write(key, va);

}
}

public static void main(String[] args) throws Exception {
// 测试数据运行一次后需要注释
// testData();
Configuration conf=new Configuration();
conf=HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.61.128");

Job job=Job.getInstance(conf,"mapandreduce02");
job.setJarByClass(YaoHbaseAndMapReduce02.class);//将此类打成jar包

Scan scan=new Scan();
//取对业务有用的数据 tags, nickname
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

TableMapReduceUtil.initTableMapperJob("blog1", scan, YaoMap02.class, Text.class, Text.class, job);

FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/hbaseout" + new Date().getTime()));
job.setReducerClass(YaoReduce02.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

public static void testData() {
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.61.128");
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();

TableName tn = TableName.valueOf("blog1");
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}

HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd01 = new HColumnDescriptor("article");
htd.addFamily(hcd01);
HColumnDescriptor hcd02 = new HColumnDescriptor("author");
htd.addFamily(hcd02);
admin.createTable(htd);

Table t = con.getTable(tn);
Put put = new Put(Bytes.toBytes("1"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"),
Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, "
+ "realtime read/write access to your Big Data"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase"));
put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan"));

Put put02 = new Put(Bytes.toBytes("10"));
put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi"));

Put put03 = new Put(Bytes.toBytes("100"));
put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman"));

List<Put> puts = Arrays.asList(put, put02, put03);
t.put(puts);
System.out.println("==========> 测试数据准备完成...");

if (admin != null) {
admin.close();
}
if (con != null) {
con.close();
}

} catch (IOException e) {
e.printStackTrace();
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息