MapReduce二次排序
2016-02-25 12:26
483 查看
二次排序
什么是二次排序
在MapReduce操作时,我们传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。
二次排序思路
我们都知道在MapReduce的运行中,他会根据Key来进行排序,而二次排序,则是在经过Key排序后,将Key和需要排序的Value进行组合,形成一个新的字符然后再次进行排序。在二次排序后,形成的key#value是一个组合字符作为新的key,那么在reduce分组的时候,他仍然会根据key进行分组,然而因为value值的不同,那么导致分组时会出现错误,因此,我们要 注意设定在MapRedue中指定原始的key进行分组
,同样的道理,在map分区的过程,同样因为key#value的组合而导致分区会不一样,我们要指定按照原始key进行分区
如果排序的字段是数据的时候,一定要注意正数(或负数)进行比较。因为如果排序的字段正数或负数同时存在会出现错误,所以我们可以再排序前将数字增加(或减去)一个最大值,使所有的数据呈现同正(或同负)
二次排序代码案例
//————–设置Partitioner,原始Key进行分组———————package com.hao.bigdata.mapreduce.SecondarySort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class FirstPartitioner extends Partitioner<SkeyWritable, IntWritable> { @Override public int getPartition(SkeyWritable key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
//——————–compare 设定——————————-
package com.hao.bigdata.mapreduce.SecondarySort; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; public class FirstGroupingComparator implements RawComparator<SkeyWritable> { public int compare(SkeyWritable o1, SkeyWritable o2) { // compare return o1.getFirst().compareTo(o2.getFirst()); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // TODO Auto-generated method stub return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
//——————–mapreduce——————————-
package com.hao.bigdata.mapreduce.SecondarySort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SecondarySortReduceModel extends Configured implements Tool { // maper classs /*** * @author hao public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public static class SecondaryMapper extends Mapper<LongWritable, Text, SkeyWritable, IntWritable> { // extends-mapper-jilei // set map output value private SkeyWritable mapOutputKey = new SkeyWritable(); private IntWritable mapOuputValue = new IntWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO // LINV VALUE String lineValue = value.toString(); // SPLIT String[] strs = lineValue.split(","); if (2 != strs.length) { return; } // set map output mapOutputKey.set(strs[0], Integer.valueOf(strs[1])); mapOuputValue.set(Integer.valueOf(strs[1])); // output context.write(mapOutputKey, mapOuputValue); } } // reducer class /** * * @author hao public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> */ // TODO public static class SecondaryReducer extends Reducer<SkeyWritable, IntWritable, Text, IntWritable> { // set output private Text outputKey = new Text(); @Override public void reduce(SkeyWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub // set output key outputKey.set(key.getFirst()); // iterator for (IntWritable value : values) { context.write(outputKey, value); } } } // driver public int run(String args[]) throws Exception { // step 1: get Configuration Configuration configuration = super.getConf(); // step 2: creat Job chuanlian input-> map->reduce->output Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); job.setJarByClass(this.getClass()); // jar bao /** * step 3:job input ->map ->reduce ->output */ // step 3.1:input Path inpath = new Path(args[0]); // fengzhuang lujing FileInputFormat.addInputPath(job, inpath); // step 3.2:mapper job.setMapperClass(SecondaryMapper.class); job.setMapOutputKeyClass(SkeyWritable.class); // zhiding,map,shuchu<key,value>leixing job.setMapOutputValueClass(IntWritable.class); // =============shuffle======================== // 1.partitioner job.setPartitionerClass(FirstPartitioner.class); // 2.sort // job.setSortComparatorClass(cls); // 3.combin // job.setCombinerClass(WebReducer.class); // 4.compress // set by configuration // 5.group job.setGroupingComparatorClass(FirstGroupingComparator.class); // ==============shuffle======================= // step 3.3:reducer job.setReducerClass(SecondaryReducer.class);// zhiding,reduce,shuchu<keyK,value>,leixing job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // set reduce num job.setNumReduceTasks(3); // step 3.4:output Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outpath); boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } // main public static void main(String[] args) throws Exception { /* * args = new String[] { * "hdfs://bigdata00.hadoop-hao.com:8020/data/inputFiles/input02", * "hdfs://bigdata00.hadoop-hao.com:8020/data/outputFiles/output04" }; */ // create configuration Configuration configuration = new Configuration(); // run job int status = ToolRunner.run(configuration, new SecondarySortReduceModel(), args); // exit program System.exit(status); } }
//———————自定义输出类———————–
public class SkeyWritable implements WritableComparable<SkeyWritable> { private String first; private int second; public SkeyWritable() { } public void set(String first, int second) { this.first = first; this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second +Integer.MAX_VALUE; } public void setSecond(int second) { this.second = second-Integer.MAX_VALUE; } public SkeyWritable(String first, int second) { this.set(first, second); } // ===== public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second); } public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readInt(); } public int compareTo(SkeyWritable o) { int comp = this.first.compareTo(o.getFirst()); if (0 != comp) { return comp; } return Integer.valueOf(getSecond()).compareTo( Integer.valueOf(o.getSecond())); } }
MapReduce join
map 端 join
适用场景:假如有一大一小两个数据表的时候,可以使用map端的join思路:在map端,将小表数据加载到内存中,获取key值,再加载大表数据,根据key的值,将小表数据读出,合并到大表中,关联整合。。
reduce 端 join
适用场景:当有两个相当的大表数据,数据较多,可以在reduce端使用join将两表关联整合。。半连接join
适用场景:当有三个表的时候,可以使用半连接join思路:按照map端的join,先将两个表在map关联整合,然后再reduce端与map整合的数据进行关联;
mapreduce join代码案例
package com.hao.bigdata.hadoop.mapreduce.join; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JoinMapReduce extends Configured implements Tool { // maper classs /*** * @author hao public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public static class JoinMapper extends Mapper<LongWritable, Text, LongWritable, JoinWritable> { // extends-mapper-jilei // set,map,output,value private final static JoinWritable mapOutputValue = new JoinWritable(); private LongWritable mapOutputKey = new LongWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split(","); //获取数据长度 int length = strs.length; //根据数据长度判断数据内容 if ((length != 3) && (length != 4)) { return; } // 获取表中Cid并将之作为key Long cid = Long.valueOf(strs[0]); // set,output,key mapOutputKey.set(cid); // name String name = String.valueOf(strs[1]); if (3 == length) { String phoneno = strs[2]; // 输出customer的value mapOutputValue.set("customer", name + "," + phoneno); } if (4 == length) { String price = strs[2]; String data = strs[3]; //输出order表数据 mapOutputValue.set("order", price + "," + data); } } } // reducer class /** * * @author hao public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> */ // TODO public static class joinReducer extends //设定key不输出,只输出value Reducer<LongWritable, JoinWritable, NullWritable, Text> { private Text outputValue = new Text(); @Override public void reduce(LongWritable key, Iterable<JoinWritable> values, Context context) throws IOException, InterruptedException { // interator String customerInfo = null; //设定一个列历来存储order表的信息 List<String> orderList = new ArrayList<String>(); //循环获取数据 for (JoinWritable value : values) { String tag = value.getTag(); if ("customer".equals(tag)) { customerInfo = value.getData(); } else if ("order".equals(tag)) { orderList.add(value.getData()); } } // 根据数据长度对数据进行判断 if (0 == orderList.size()) { return; } // output value //输出value信息 for (String order : orderList) { outputValue.set(key.toString() + "," + customerInfo + "," + order); } context.write(NullWritable.get(), outputValue); } } // driver public int run(String args[]) throws Exception { // step 1: get Configuration Configuration configuration = super.getConf(); // step 2: creat Job chuanlian input-> map->reduce->output Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); job.setJarByClass(this.getClass()); // jar bao /** * step 3:job input ->map ->reduce ->output */ // step 3.1:input Path inpath = new Path(args[0]); // fengzhuang lujing FileInputFormat.addInputPath(job, inpath); // step 3.2:mapper job.setMapperClass(JoinMapper.class); job.setMapOutputKeyClass(LongWritable.class); // zhiding,map,shuchu<key,value>leixing job.setMapOutputValueClass(JoinWritable.class); // =============shuffle======================== // 1.partitioner // job.setPartitionerClass(cls); // 2.sort // job.setSortComparatorClass(cls); // 3.combin //job.setCombinerClass(WebReducer.class); // 4.compress // set by configuration // 5.group // job.setGroupingComparatorClass(cls); // ==============shuffle======================= // step 3.3:reducer job.setReducerClass(joinReducer.class);// zhiding,reduce,shuchu<keyK,value>,leixing job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); /* * //set reduce num job.setNumReduceTasks(0); */ // step 3.4:output Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outpath); boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } // main public static void main(String[] args) throws Exception { /* * args = new String[] { * "hdfs://bigdata00.hadoop-hao.com:8020/data/inputFiles/input02", * "hdfs://bigdata00.hadoop-hao.com:8020/data/outputFiles/output04" }; */ // create configuration Configuration configuration = new Configuration(); // run job int status = ToolRunner.run(configuration, new JoinMapReduce(), args); // exit program System.exit(status); } }
//———————–自定义输出类型———————
package com.hao.bigdata.hadoop.mapreduce.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class JoinWritable implements Writable { // mark customer/order private String tag; private String data; public JoinWritable() { } public void set(String tag, String data) { this.setTag(tag); this.setData(data); } public JoinWritable(String tag, String data) { this.set(tag, data); } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getData() { return data; } public void setData(String data) { this.data = data; } public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(this.getTag()); out.writeUTF(this.getData()); } public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.setTag(in.readUTF()); this.setData(in.readUTF()); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((data == null) ? 0 : data.hashCode()); result = prime * result + ((tag == null) ? 0 : tag.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; JoinWritable other = (JoinWritable) obj; if (data == null) { if (other.data != null) return false; } else if (!data.equals(other.data)) return false; if (tag == null) { if (other.tag != null) return false; } else if (!tag.equals(other.tag)) return false; return true; } @Override public String toString() { return "tag"+"," + "data"; } }
相关文章推荐
- 请简要描述客户端发送的http request header都包含哪些内容
- coa
- HTTP get、post请求与post文件发生
- 登录平台
- hive 关于array的常用操作。[排序][包含]
- servlet容器 在java应用程序作用
- JAVA经常使用数据结构及原理分析
- Safari浏览器inline-block水平对齐问题
- 代码整洁之道之旅心得与体会
- Mysql 5.7.11 手动安装方法
- Cornerstone log不能用
- windows server2008破解密码不用系统光盘 推荐
- mysql 学习库
- Monolog:PHP 日志记录工具
- 该如何理解AMD ,CMD,CommonJS规范--javascript模块化加载学习总结
- 分析一个嵌入payload的恶意.lnk文件
- kafka leader 服务器均衡。
- Android安全攻防战,反编译与混淆技术完全解析(上)
- POJ 1182 食物链/并查集
- Python程序设计读者群成立通知