Hadoop基础教程-第7章 MapReduce进阶(7.6 MapReduce 二次排序)
2017-06-23 15:11
495 查看
第7章 MapReduce进阶
7.6 MapReduce 二次排序
7.6.1 二次排序概述
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。所谓二次排序,先按第1个字段进行排序,然后对第1个字段相同的数据,再按第2个字段进行排序。序号 | 价格 | 类别 | 书名 |
---|---|---|---|
3005 | 49.5 | S3 | 大数据概论 |
2001 | 49.0 | S2 | Java |
1021 | 45.0 | S1 | 数据结构 |
1001 | 39.0 | S1 | 计算机基础 |
2010 | 48.5 | S2 | C#语言 |
3001 | 89.0 | S3 | Hadoop基础 |
2030 | 60.0 | S2 | MySQL |
2071 | 99.0 | S2 | Oracle |
2091 | 69.0 | S2 | Linux |
3004 | 56.0 | S3 | HBase教程 |
3002 | 98.0 | S3 | Spark基础 |
3003 | 49.0 | S3 | Hive教程 |
1002 | 39.0 | S1 | C语言 |
序号 | 价格 | 类别 | 书名 |
---|---|---|---|
1002 | 39.0 | S1 | C语言 |
1001 | 39.0 | S1 | 计算机基础 |
1021 | 45.0 | S1 | 数据结构 |
2010 | 48.5 | S2 | C#语言 |
2001 | 49.0 | S2 | Java |
2030 | 60.0 | S2 | MySQL |
2091 | 69.0 | S2 | Linux |
2071 | 99.0 | S2 | Oracle |
3003 | 49.0 | S3 | Hive教程 |
3005 | 49.5 | S3 | 大数据概论 |
3004 | 56.0 | S3 | HBase教程 |
3001 | 89.0 | S3 | Hadoop基础 |
3002 | 98.0 | S3 | Spark基础 |
7.6.2上传数据1到HDFS
hdfs dfs -put books.txt inputhdfs dfs -cat input/books.txt
[root@hds117 data]# ls books.txt dept.txt emp.txt [root@hds117 data]# hdfs dfs -put books.txt input [root@hds117 data]# hdfs dfs -ls input Found 4 items -rw-r--r-- 3 root hbase 329 2017-06-23 15:56 input/books.txt -rw-r--r-- 3 root hbase 82 2017-06-23 11:04 input/dept.txt -rw-r--r-- 3 root hbase 513 2017-06-23 11:04 input/emp.txt -rw-r--r-- 3 root hbase 871353053 2017-06-23 14:19 input/ncdc.txt [root@hds117 data]# hdfs dfs -cat input/books.txt 3005 49.5 S3 大数据概论 2001 49.0 S2 Java 1021 45.0 S1 数据结构 1001 39.0 S1 计算机基础 2010 48.5 S2 C#语言 3001 89.0 S3 Hadoop基础 2030 60.0 S2 MySQL 2071 99.0 S2 Oracle 2091 69.0 S2 Linux 3004 56.0 S3 HBase教程 3002 98.0 S3 Spark基础 3003 49.0 S3 Hive教程 1002 39.0 S1 C语言
7.6.3 Mapper
package cn.hadron.mr.sort; import java.io.IOException; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class Sort2Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //仅将vaule转换为key输出 context.write(value,NullWritable.get()); } }
7.6.4 Partitioner
package cn.hadron.mr.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * <Text,NullWritable>是Mapper的输出类型 * @author hadron */ public class MyPartitioner extends HashPartitioner<Text,NullWritable>{ //执行时间越短越好 public int getPartition(Text key, NullWritable value, int numReduceTasks) { return (key.toString().split("\t")[2].hashCode() & Integer.MAX_VALUE)%numReduceTasks; } }
7.6.5 Comparator
package cn.hadron.mr.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyComparator extends WritableComparator{ protected MyComparator(){ super(Text.class,true); } @Override public int compare(WritableComparable k1, WritableComparable k2) { String[] a1=k1.toString().split("\t"); String[] a2=k2.toString().split("\t"); //如果种类字段相同,则比较价格字段 if(a1[2].equals(a2[2])){ //如果价格也相同,如果返回0,则认为是相同的书;所以需要进一步比较书名 if(a1[1].equals(a2[1])){ return a1[1].compareTo(a1[3]); }else{ return a1[1].compareTo(a2[1]); } }else{ return a1[2].compareTo(a2[2]); } } }
7.6.6 Reducer
package cn.hadron.mr.sort; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Sort2Reducer extends Reducer<Text, NullWritable, NullWritable,Text>{ protected void reduce(Text key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { context.write(NullWritable.get(),key); /*for(Text value:values){ context.write(NullWritable.get(),value); }*/ } }
7.6.7 主方法
package cn.hadron.mr.sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { // 设置环境变量HADOOP_USER_NAME,其值是root System.setProperty("HADOOP_USER_NAME", "root"); // Configuration类包含了Hadoop的配置 Configuration config = new Configuration(); // 设置fs.defaultFS config.set("fs.defaultFS", "hdfs://192.168.80.131:9000"); // 设置yarn.resourcemanager节点 config.set("yarn.resourcemanager.hostname", "node1"); try { FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("Sort2"); // 设置Mapper和Reducer类 job.setMapperClass(Sort2Mapper.class); job.setReducerClass(Sort2Reducer.class); //设置map方法输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 设置reduce方法输出key和value的类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //设置自定义工具类 job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MyComparator.class); //job.setGroupingComparatorClass(MyGroup.class); //设置Reduce Task数 //job.setNumReduceTasks(3); // 指定输入输出路径 FileInputFormat.addInputPath(job, new Path("/user/root/input/books.txt")); Path outpath = new Path("/user/root/output/"); if (fs.exists(outpath)) { fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); // 提交任务,等待执行完成 boolean f = job.waitForCompletion(true); if (f) { System.out.println("job任务执行成功"); } } catch (Exception e) { e.printStackTrace(); } } }
7.6.7 运行结果
Eclipse运行结果log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. job任务执行成功
HDFS输出结果
hdfs dfs -ls output
hdfs dfs -cat output/part-r-00000
[root@node1 ~]# hdfs dfs -ls output Found 2 items -rw-r--r-- 3 root supergroup 0 2017-06-25 00:04 output/_SUCCESS -rw-r--r-- 3 root supergroup 337 2017-06-25 00:04 output/part-r-00000 [root@node1 ~]# hdfs dfs -cat output/part-r-00000 1002 39.0 S1 C语言 1001 39.0 S1 计算机基础 1021 45.0 S1 数据结构 2010 48.5 S2 C#语言 2001 49.0 S2 Java 2030 60.0 S2 MySQL 2091 69.0 S2 Linux 2071 99.0 S2 Oracle 3003 49.0 S3 Hive教程 3005 49.5 S3 大数据概论 3004 56.0 S3 HBase教程 3001 89.0 S3 Hadoop基础 3002 98.0 S3 Spark基础
相关文章推荐
- Hadoop基础教程-第7章 MapReduce进阶(7.7 MapReduce 全排序)
- Hadoop基础教程-第7章 MapReduce进阶(7.1 MapReduce过程)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.2 MapReduce工作机制)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.3 MapReduce API)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.4 自定义Key类型)
- Hadoop基础教程-第7章 MapReduce进阶(7.5 MapReduce 连接)
- Hadoop基础教程-第6章 MapReduce入门(6.2 解读WordCount)(草稿)
- Hadoop---mapreduce排序和二次排序以及全排序
- Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气
- Hadoop入门案例(六)之二次排序,全排序基础下的二次排序
- mapreduce(七):hadoop二次排序
- Hadoop基础教程-第6章 MapReduce入门(6.1 MapReduce介绍)(草稿)
- (Hadoop学习-2)mapreduce实现二次排序
- Hadoop MapReduce 二次排序原理及其应用
- Hadoop基础教程-第6章 MapReduce入门(6.4 MapReduce程序框架)(草稿)
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 吴超-----mapreduce的二次排序【在key排序的基础上,对value也进行排序】RawComparator
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]