mapreduce读取hbase中表的数据,直接打印或者回传数据到hbase数据库表
2015-08-25 18:58
246 查看
package com.syyz.zjs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class ExampleTotalMapReduce{ public static void main(String[] args) { try{ Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "node7,node8,node9"); Job job = new Job(config,"ExampleSummary"); job.setJarByClass(ExampleTotalMapReduce.class); // class that contains mapper and reducer Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs //scan.addColumn(family, qualifier); TableMapReduceUtil.initTableMapperJob( "t_cdr3", // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "t_cdr", // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } catch(Exception e){ e.printStackTrace(); } } public static class MyMapper extends TableMapper<Text, IntWritable> { private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { //取了表中一个datetime的字段 String datetime = new String(value.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("datetime"))); text.set(datetime); context.write(text, ONE); } } //把取到的值直接打印了。。 public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; System.out.println(key+"*************"); // for (IntWritable val : values) { // //sum += val.get(); // // } //把结果输出到hbase中了 // Put put = new Put(key.getBytes()); // put.add(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); // // context.write(null, put); } } }注意:本地windows7调试该工程时,需要修改源码里面的NativeIO.java
相关文章推荐
- 用Oracle的TRIM函数去除字符串首尾指定字符
- tomcat与redis链接redis存储session
- 当有多于64合乎逻辑的cpu时刻,Windows 下一个Oracle db 实例启动(startup)什么时候会hang(待定)
- mysql MyISAM表锁
- 如何编辑SDE数据库
- MySql数据库分表分区方法
- Two ways to load mysql tables into hdfs via spark
- win10 x64下安装oracle 12c出现[INS-30131]报错的解决方案
- PostgreSQL新手上路PG::ConnectionBad (FATAL: Peer authentication failed
- 根据sys.database_mirroring查询镜像数据库同步状态
- emysql add_poop() 超时出错
- emysql add_poop() 超时出错
- SQL Server 2008 / 2008 R2 定期自动备份数据库
- PostgreSQL新手上路PG::ConnectionBad (FATAL: Peer authentication failed
- SQL 行列转化
- 试用AWS RDS的还原到时间点功能
- wamp mysql 中文乱码问题
- MesaSQLite数据库
- oracle根据pid查询出正在执行的执行语句
- 在Oracle中查询表的大小、表的占用情况和表空间的大小