Hbase数据解析mapreduce过程及遇到的问题
2016-05-18 19:44
435 查看
本次介绍的是将hbase中存的数据,用mapreduce解析。
一,hbase中的数据结构:
二,分析map过程:
因为这里是对hbase中的表进行解析,所以map必须是继承TableMapper类来对数据进行解析。
注意:输入的key value必须是ImmutableBytesWritable和Result。输出的就可以自己定义了。
ImmutableBytesWritable,指代的是行健,
Result,指代的是值。
在这个map过程中会将tags,用“,”分割得到一个字符数组,然后同nikname一起写出,nakname做key,tag做value。
三,分析Reduce过程:
这里的Reduce过程跟平时的没有太大的区别,在整合数据时,也是用“,”分割的。
最后注意:
map过程的输出值,与reduce的输入值,一定要对应,并且value不能为空,否则reduce过程会进不去。开始我的就是key有,但是value空了,导致reduce过程一直不进去。
这里还提供了hbase表创建的方法testData()。
四,代码:
一,hbase中的数据结构:
二,分析map过程:
因为这里是对hbase中的表进行解析,所以map必须是继承TableMapper类来对数据进行解析。
注意:输入的key value必须是ImmutableBytesWritable和Result。输出的就可以自己定义了。
ImmutableBytesWritable,指代的是行健,
Result,指代的是值。
在这个map过程中会将tags,用“,”分割得到一个字符数组,然后同nikname一起写出,nakname做key,tag做value。
三,分析Reduce过程:
这里的Reduce过程跟平时的没有太大的区别,在整合数据时,也是用“,”分割的。
最后注意:
map过程的输出值,与reduce的输入值,一定要对应,并且value不能为空,否则reduce过程会进不去。开始我的就是key有,但是value空了,导致reduce过程一直不进去。
这里还提供了hbase表创建的方法testData()。
四,代码:
mport java.io.IOException; import java.util.Arrays; import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class YaoHbaseAndMapReduce02 { public static class YaoMap02 extends TableMapper<Text, Text>{ @Override //key是hbase中的行键 //value是hbase中的所行键的所有数据 protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { Text v=null; String[] kStrs=null; List<Cell> cs=value.listCells(); for(Cell cell:cs){ if("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ kStrs=Bytes.toString(CellUtil.cloneValue(cell)).split(","); // System.out.println("yaomap,kStrs=="+kStrs); } else if("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ v=new Text(CellUtil.cloneValue(cell)); } } for (String kStr : kStrs) { System.out.println("*****"+new Text(kStr.toLowerCase())+"==="+v); context.write(new Text(kStr.toLowerCase()), v); } } } public static class YaoReduce02 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuilder sb=new StringBuilder(); for(Text text:values){ System.out.println("sb=="+sb); System.out.println(sb.length() > 0); sb.append((sb.length() > 0 ? ",":"") + text.toString()); } Text va=new Text(sb.toString()); context.write(key, va); } } public static void main(String[] args) throws Exception { // 测试数据运行一次后需要注释 // testData(); Configuration conf=new Configuration(); conf=HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.61.128"); Job job=Job.getInstance(conf,"mapandreduce02"); job.setJarByClass(YaoHbaseAndMapReduce02.class);//将此类打成jar包 Scan scan=new Scan(); //取对业务有用的数据 tags, nickname scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags")); scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname")); TableMapReduceUtil.initTableMapperJob("blog1", scan, YaoMap02.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/hbaseout" + new Date().getTime())); job.setReducerClass(YaoReduce02.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static void testData() { try { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.61.128"); Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("blog1"); if (admin.tableExists(tn)) { admin.disableTable(tn); admin.deleteTable(tn); } HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd01 = new HColumnDescriptor("article"); htd.addFamily(hcd01); HColumnDescriptor hcd02 = new HColumnDescriptor("author"); htd.addFamily(hcd02); admin.createTable(htd); Table t = con.getTable(tn); Put put = new Put(Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"), Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, " + "realtime read/write access to your Big Data")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase")); put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan")); put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan")); Put put02 = new Put(Bytes.toBytes("10")); put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop")); put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi")); Put put03 = new Put(Bytes.toBytes("100")); put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql")); put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman")); List<Put> puts = Arrays.asList(put, put02, put03); t.put(puts); System.out.println("==========> 测试数据准备完成..."); if (admin != null) { admin.close(); } if (con != null) { con.close(); } } catch (IOException e) { e.printStackTrace(); } } }
相关文章推荐
- 我是运营,我没有假期
- Hadoop_2.1.0 MapReduce序列图
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- PostgreSQL教程(三):表的继承和分区表详解
- C#数据结构之顺序表(SeqList)实例详解
- 修复mysql数据库
- Lua面向对象之类和继承浅析
- Lua教程(七):数据结构详解
- 浅析Ruby中继承和消息的相关知识
- 浅析SQL数据操作语句
- SQLServer 数据导入导出的几种方法小结
- 简述MySQL分片中快速数据迁移
- MySQL数据备份之mysqldump的使用详解
- 解析从源码分析常见的基于Array的数据结构动态扩容机制的详解
- C#数据结构之队列(Quene)实例详解
- C#数据结构揭秘一