您的位置:首页 > 运维架构

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

2017-07-25 18:05 651 查看
MapReduce功能实现系列:

MapReduce功能实现一---Hbase和Hdfs之间数据相互转换

MapReduce功能实现二---排序

MapReduce功能实现三---Top N

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

MapReduce功能实现五---去重(Distinct)、计数(Count)

MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能实现七---小综合(多个job串行处理计算平均值)

MapReduce功能实现八---分区(Partition)

MapReduce功能实现九---Pv、Uv

MapReduce功能实现十---倒排索引(Inverted Index)

MapReduce功能实现十一---join

方法一:

在Hbase中建立相应的表1:

create 'hello','cf'
put 'hello','1','cf:hui','hello world'
put 'hello','2','cf:hui','hello hadoop'
put 'hello','3','cf:hui','hello hive'
put 'hello','4','cf:hui','hello hadoop'
put 'hello','5','cf:hui','hello world'
put 'hello','6','cf:hui','hello world'
put 'hello','7','cf:hui','hbase hive'


java代码:

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String tablename = "hello";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(HbaseTopJiang1.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
job.setReducerClass(WordCountHbaseReaderReduce.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class doMapper extends TableMapper<Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
/*不进行分隔,将value整行全部获取
String rowValue = Bytes.toString(value.list().get(0).getValue());
context.write(new Text(rowValue), one);
*/
String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
for (String str: rowValue){
word.set(str);
context.write(word,one);
}
}
}

public static final int K = 3;
public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
@Override
public int compare(Integer x, Integer y) {
return y.compareTo(x);
}
});
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//reduce后的结果放入treeMap,而不是向context中记入结果
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
if (treeMap.containsKey(sum)){
String value = treeMap.get(sum) + "," + key.toString();
treeMap.put(sum,value);
}else {
treeMap.put(sum, key.toString());
}
if(treeMap.size() > K) {
treeMap.remove(treeMap.lastKey());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
//将treeMap中的结果,按value-key顺序写入contex中
for (Integer key : treeMap.keySet()) {
context.write(new Text(treeMap.get(key)), new IntWritable(key));
}
}
}
}


在Linux中执行该代码:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class

[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang1 /out

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000

hello   6

world   3

hadoop,hive     2

方法二:

truncate 'hello'
put 'hello','1','cf:hui','hello world	world'
put 'hello','2','cf:hui','hello hadoop	hadoop'
put 'hello','3','cf:hui','hello hive	hive'
put 'hello','4','cf:hui','hello hadoop	hadoop'
put 'hello','5','cf:hui','hello world	world'
put 'hello','6','cf:hui','hello world	world'
put 'hello','7','cf:hui','hbase hive	hive'
注意:相同单词之间的分隔符是"/t"(Tab键),结果hbase中插入数据的时候根本就不能插入制表符,所以该方法破产,可以参考一下思想

java代码:

import java.io.IOException;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang2{
public static class doMapper extends TableMapper<Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
/*不进行分隔,将value整行全部获取
String rowValue = Bytes.toString(value.list().get(0).getValue());
context.write(new Text(rowValue), one);
*/
String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
for (String str: rowValue){
word.set(str);
context.write(word,one);
}
}
}

public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable val : values){
total++;
}
context.write(key, new IntWritable(total));
}
}

public static final int K = 3;
/**
* 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。
*/
public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {
TreeMap<Integer, String> map = new TreeMap<Integer, String>();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String result[] = line.split("\t");
StringBuffer hui = null;
if(result.length > 2){	//我怕在往hbase表输入数据时带\t分隔符的,后来发现hbase中插入数据的时候根本就不能插入制表符
for(int i=0;i<result.length-2;i++){
hui=new StringBuffer().append(result[i]);
}
}else{
hui = new StringBuffer().append(result[0]);
}
if(line.trim().length() > 0 && line.indexOf("\t") != -1) {
String[] arr = line.split("\t", 2);
String name = arr[0];
Integer num = Integer.parseInt(arr[1]);
if (map.containsKey(num)){
String value1 = map.get(num) + "," + hui;
map.put(num,value1);
}
else {
map.put(num, hui.toString());
}
if(map.size() > K) {
map.remove(map.firstKey());
}
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
for(Integer num : map.keySet()) {
context.write(new IntWritable(num), new Text(map.get(num)));
}
}
}

/**
* 按照key的大小来划分区间,当然,key是int值
*/
public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
/**
* int值的hashcode还是自己本身的数值
*/
//这里我认为大于maxValue的就应该在第一个分区
int maxValue = 50;
int keySection = 0;
// 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0
if (numReduceTasks > 1 && key.hashCode() < maxValue) {
int sectionValue = maxValue / (numReduceTasks - 1);
int count = 0;
while ((key.hashCode() - sectionValue * count) > sectionValue) {
count++;
}
keySection = numReduceTasks - 1 - count;
}
return keySection;
}
}

/**
* int的key按照降序排列
*/
public static class IntKeyDescComparator extends WritableComparator {
protected IntKeyDescComparator() {
super(IntWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
}

/**
* 把key和value颠倒过来输出
*/
public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
private Text result = new Text();
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
result.set(val.toString());
context.write(result, key);
}
}
}

public static void main(String[] args) throws Exception {
String tablename = "hello";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
Job job1 = new Job(conf, "WordCountHbaseReader");
job1.setJarByClass(HbaseTopJiang2.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1);
job1.setReducerClass(WordCountReducer.class);
FileOutputFormat.setOutputPath(job1, new Path(args[0]));
MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);

Job job2 = Job.getInstance(conf, "Topjiang");
job2.setJarByClass(HbaseTopJiang2.class);
job2.setMapperClass(KMap.class);
job2.setSortComparatorClass(IntKeyDescComparator.class);
job2.setPartitionerClass(KeySectionPartitioner.class);
job2.setReducerClass(SortIntValueReduce.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));

//提交job1及job2,并等待完成
if (job1.waitForCompletion(true)) {
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}
}
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class

[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output

[hadoop@h71 q1]$ hadoop fs -ls /out

-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /out/_SUCCESS

-rw-r--r--   2 hadoop supergroup         32 2017-03-18 19:02 /out/part-r-00000

[hadoop@h71 q1]$ hadoop fs -ls /output

-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /output/_SUCCESS

-rw-r--r--   2 hadoop supergroup         25 2017-03-18 19:02 /output/part-r-00000

理想结果:

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000

hbase 1

hadoop hadoop
2

hello 6

hive hive
2

world world
3

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000

hello 6

world world
3

hadoop hadoop,hive hive 2

(分隔符都为制表符)

我发现制表符(Tab键)从UltraEdit复制到SecureCRT正常,而从SecureCRT复制到UltraEdit则制表符会变成空格,也是醉了。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐