您的位置:首页 > 其它

Hbase篇--Hbase和MapReduce结合Api

2018-01-17 21:27 148 查看
一.前述
Mapreduce可以自定义Inputforma对象和OutPutformat对象,所以原理上Mapreduce可以和任意输入源结合。
二.步骤
将结果写会到hbase中去。
 2.1 Main函数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
* 分析hdfs 文本  统计单词数量
* 结果输出到 hbase表
* create 'wc','cf'
* rowkey: 单词        cf:count=单词数量
* @author root
*
*/
public class WCDemo {

/**
*
* wc
* 数据hbase表    rowkey  cell存放文本
* 结果输出到 hbase表
*
*/

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

   conf.set("fs.defaultFS", "hdfs://node1:8020");//设置hdfs集群nameservices名称
conf.set("hbase.zookeeper.quorum", "node4");

Job job = Job.getInstance(conf);

job.setJarByClass(WCDemo.class);

job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//        job.setReducerClass();

//addDependencyJars  本地方式运行: 设置为false
//        TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job);
TableMapReduceUtil.initTableReducerJob("wc",WCReducer.class, job,
null, null, null, null, false);

Path path = new Path("/user/wc");
FileInputFormat.addInputPath(job, path);

boolean flag = job.waitForCompletion(true);
if(flag) {
System.out.println("success~~");
}
}

}
2.2 Mapper函数(和正常的Mapper没啥区别)
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" ");

for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
2.3 Reduce函数(主要是把Put对象写出去)
import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class WCReducer extends
TableReducer<Text, IntWritable, ImmutableBytesWritable> {

@Override
protected void reduce(Text text, Iterable<IntWritable> iterable,
Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable i : iterable) {
sum += i.get();
}

 Put put = new Put(text.toString().getBytes());
put.add("cf".getBytes(), "count".getBytes(), (sum+"").getBytes());

context.write(null, put);
}
}
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: