MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
2017-07-25 18:05
651 查看
MapReduce功能实现系列:
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
方法一:
在Hbase中建立相应的表1:
java代码:
在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang1 /out
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hello 6
world 3
hadoop,hive 2
方法二:
java代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output
[hadoop@h71 q1]$ hadoop fs -ls /out
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 32 2017-03-18 19:02 /out/part-r-00000
[hadoop@h71 q1]$ hadoop fs -ls /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 25 2017-03-18 19:02 /output/part-r-00000
理想结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hbase 1
hadoop hadoop
2
hello 6
hive hive
2
world world
3
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello 6
world world
3
hadoop hadoop,hive hive 2
(分隔符都为制表符)
我发现制表符(Tab键)从UltraEdit复制到SecureCRT正常,而从SecureCRT复制到UltraEdit则制表符会变成空格,也是醉了。。。
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
方法一:
在Hbase中建立相应的表1:
create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'hello','2','cf:hui','hello hadoop' put 'hello','3','cf:hui','hello hive' put 'hello','4','cf:hui','hello hadoop' put 'hello','5','cf:hui','hello world' put 'hello','6','cf:hui','hello world' put 'hello','7','cf:hui','hbase hive'
java代码:
import java.io.IOException; import java.util.Comparator; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HbaseTopJiang1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String tablename = "hello"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "h71"); Job job = new Job(conf, "WordCountHbaseReader"); job.setJarByClass(HbaseTopJiang1.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job); job.setReducerClass(WordCountHbaseReaderReduce.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class doMapper extends TableMapper<Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { /*不进行分隔,将value整行全部获取 String rowValue = Bytes.toString(value.list().get(0).getValue()); context.write(new Text(rowValue), one); */ String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" "); for (String str: rowValue){ word.set(str); context.write(word,one); } } } public static final int K = 3; public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> { //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排 private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() { @Override public int compare(Integer x, Integer y) { return y.compareTo(x); } }); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce后的结果放入treeMap,而不是向context中记入结果 int sum = 0; for (IntWritable val : values) { sum += val.get(); } if (treeMap.containsKey(sum)){ String value = treeMap.get(sum) + "," + key.toString(); treeMap.put(sum,value); }else { treeMap.put(sum, key.toString()); } if(treeMap.size() > K) { treeMap.remove(treeMap.lastKey()); } } protected void cleanup(Context context) throws IOException, InterruptedException { //将treeMap中的结果,按value-key顺序写入contex中 for (Integer key : treeMap.keySet()) { context.write(new Text(treeMap.get(key)), new IntWritable(key)); } } } }
在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang1 /out
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hello 6
world 3
hadoop,hive 2
方法二:
truncate 'hello' put 'hello','1','cf:hui','hello world world' put 'hello','2','cf:hui','hello hadoop hadoop' put 'hello','3','cf:hui','hello hive hive' put 'hello','4','cf:hui','hello hadoop hadoop' put 'hello','5','cf:hui','hello world world' put 'hello','6','cf:hui','hello world world' put 'hello','7','cf:hui','hbase hive hive'注意:相同单词之间的分隔符是"/t"(Tab键),结果hbase中插入数据的时候根本就不能插入制表符,所以该方法破产,可以参考一下思想
java代码:
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HbaseTopJiang2{ public static class doMapper extends TableMapper<Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { /*不进行分隔,将value整行全部获取 String rowValue = Bytes.toString(value.list().get(0).getValue()); context.write(new Text(rowValue), one); */ String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" "); for (String str: rowValue){ word.set(str); context.write(word,one); } } } public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int total=0; for (IntWritable val : values){ total++; } context.write(key, new IntWritable(total)); } } public static final int K = 3; /** * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。 */ public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> { TreeMap<Integer, String> map = new TreeMap<Integer, String>(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String result[] = line.split("\t"); StringBuffer hui = null; if(result.length > 2){ //我怕在往hbase表输入数据时带\t分隔符的,后来发现hbase中插入数据的时候根本就不能插入制表符 for(int i=0;i<result.length-2;i++){ hui=new StringBuffer().append(result[i]); } }else{ hui = new StringBuffer().append(result[0]); } if(line.trim().length() > 0 && line.indexOf("\t") != -1) { String[] arr = line.split("\t", 2); String name = arr[0]; Integer num = Integer.parseInt(arr[1]); if (map.containsKey(num)){ String value1 = map.get(num) + "," + hui; map.put(num,value1); } else { map.put(num, hui.toString()); } if(map.size() > K) { map.remove(map.firstKey()); } } } @Override protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num : map.keySet()) { context.write(new IntWritable(num), new Text(map.get(num))); } } } /** * 按照key的大小来划分区间,当然,key是int值 */ public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> { @Override public int getPartition(K key, V value, int numReduceTasks) { /** * int值的hashcode还是自己本身的数值 */ //这里我认为大于maxValue的就应该在第一个分区 int maxValue = 50; int keySection = 0; // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0 if (numReduceTasks > 1 && key.hashCode() < maxValue) { int sectionValue = maxValue / (numReduceTasks - 1); int count = 0; while ((key.hashCode() - sectionValue * count) > sectionValue) { count++; } keySection = numReduceTasks - 1 - count; } return keySection; } } /** * int的key按照降序排列 */ public static class IntKeyDescComparator extends WritableComparator { protected IntKeyDescComparator() { super(IntWritable.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } } /** * 把key和value颠倒过来输出 */ public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> { private Text result = new Text(); @Override public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { result.set(val.toString()); context.write(result, key); } } } public static void main(String[] args) throws Exception { String tablename = "hello"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "h71"); Job job1 = new Job(conf, "WordCountHbaseReader"); job1.setJarByClass(HbaseTopJiang2.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1); job1.setReducerClass(WordCountReducer.class); FileOutputFormat.setOutputPath(job1, new Path(args[0])); MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class); Job job2 = Job.getInstance(conf, "Topjiang"); job2.setJarByClass(HbaseTopJiang2.class); job2.setMapperClass(KMap.class); job2.setSortComparatorClass(IntKeyDescComparator.class); job2.setPartitionerClass(KeySectionPartitioner.class); job2.setReducerClass(SortIntValueReduce.class); job2.setOutputKeyClass(IntWritable.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2, new Path(args[0])); FileOutputFormat.setOutputPath(job2, new Path(args[1])); //提交job1及job2,并等待完成 if (job1.waitForCompletion(true)) { System.exit(job2.waitForCompletion(true) ? 0 : 1); } } }[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output
[hadoop@h71 q1]$ hadoop fs -ls /out
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 32 2017-03-18 19:02 /out/part-r-00000
[hadoop@h71 q1]$ hadoop fs -ls /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 25 2017-03-18 19:02 /output/part-r-00000
理想结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hbase 1
hadoop hadoop
2
hello 6
hive hive
2
world world
3
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello 6
world world
3
hadoop hadoop,hive hive 2
(分隔符都为制表符)
我发现制表符(Tab键)从UltraEdit复制到SecureCRT正常,而从SecureCRT复制到UltraEdit则制表符会变成空格,也是醉了。。。
相关文章推荐
- MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
- MapReduce中,从HDFS读取数据计算后写入HBase
- 使用MapReduce从HBase中读取数据存入HDFS路径问题
- 从hbase表1中读取数据,最终结果写入到hbase表2 ,如何通过MapReduce实现 ?
- MapReduce读取Hbase中多个版本的数据,统计例子。
- HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
- mapreduce实现从hbase中统计数据,结果存入mysql中
- mapreduce读取hbase数据输出到文件中
- mapreduce实现计数时未执行reduce方法(未实现统计功能)
- 通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据
- MapReduce 读取和操作HBase中的数据
- MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中?
- 设计一个学生类,其中包含学号、姓名、成绩等数据成员,创建学生对象并且倒入到文件file.txt,然后由文件读取到另一个学生对象并输出,试编程实现。
- Thinking in BigDate(八)大数据Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解
- hbase 下mapreduce 读取hbase中数据
- Hadoop MapReduce统计结果直接输出hbase
- MapReduce编程之通过MapReduce读取数据,往Hbase中写数据
- MapReduce读取hdfs上文件,建立词频的倒排索引到Hbase
- 用 Python 的输入输出功能读取和写入数据
- 操作DataTable数据实现排序、检索、合并、分页、统计等功能