您的位置:首页 > 其它

MapReduce二次排序

2016-02-25 12:26 483 查看

二次排序

什么是二次排序

在MapReduce操作时,我们传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。


二次排序思路

我们都知道在MapReduce的运行中,他会根据Key来进行排序,而二次排序,则是在经过Key排序后,将Key和需要排序的Value进行组合,形成一个新的字符然后再次进行排序。

在二次排序后,形成的key#value是一个组合字符作为新的key,那么在reduce分组的时候,他仍然会根据key进行分组,然而因为value值的不同,那么导致分组时会出现错误,因此,我们要 注意设定在MapRedue中指定原始的key进行分组

,同样的道理,在map分区的过程,同样因为key#value的组合而导致分区会不一样,我们要指定按照原始key进行分区

如果排序的字段是数据的时候,一定要注意正数(或负数)进行比较。因为如果排序的字段正数或负数同时存在会出现错误,所以我们可以再排序前将数字增加(或减去)一个最大值,使所有的数据呈现同正(或同负)

二次排序代码案例

//————–设置Partitioner,原始Key进行分组———————

package com.hao.bigdata.mapreduce.SecondarySort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<SkeyWritable, IntWritable> {

@Override
public int getPartition(SkeyWritable key, IntWritable value,
int numPartitions) {
// TODO Auto-generated method stub
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}

}


//——————–compare 设定——————————-

package com.hao.bigdata.mapreduce.SecondarySort;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class FirstGroupingComparator implements RawComparator<SkeyWritable> {

public int compare(SkeyWritable o1, SkeyWritable o2) {
//      compare
return o1.getFirst().compareTo(o2.getFirst());
}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
}
}


//——————–mapreduce——————————-

package com.hao.bigdata.mapreduce.SecondarySort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SecondarySortReduceModel extends Configured implements Tool {

// maper classs
/***
* @author hao public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class SecondaryMapper extends
Mapper<LongWritable, Text, SkeyWritable, IntWritable> { // extends-mapper-jilei
// set map output value
private SkeyWritable mapOutputKey = new SkeyWritable();
private IntWritable mapOuputValue = new IntWritable();

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO
// LINV VALUE
String lineValue = value.toString();
// SPLIT
String[] strs = lineValue.split(",");
if (2 != strs.length) {
return;
}
// set map output
mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
mapOuputValue.set(Integer.valueOf(strs[1]));

// output
context.write(mapOutputKey, mapOuputValue);
}
}

// reducer class
/**
* * @author hao public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*/
// TODO
public static class SecondaryReducer extends
Reducer<SkeyWritable, IntWritable, Text, IntWritable> {
// set output
private Text outputKey = new Text();

@Override
public void reduce(SkeyWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
// set output key
outputKey.set(key.getFirst());
// iterator
for (IntWritable value : values) {
context.write(outputKey, value);

}
}
}

// driver
public int run(String args[]) throws Exception {
// step 1: get Configuration
Configuration configuration = super.getConf();

// step 2: creat Job chuanlian input-> map->reduce->output
Job job = Job.getInstance(configuration, this.getClass()
.getSimpleName());
job.setJarByClass(this.getClass()); // jar bao

/**
* step 3:job input ->map ->reduce ->output
*/
// step 3.1:input
Path inpath = new Path(args[0]); // fengzhuang lujing
FileInputFormat.addInputPath(job, inpath);
// step 3.2:mapper
job.setMapperClass(SecondaryMapper.class);
job.setMapOutputKeyClass(SkeyWritable.class); // zhiding,map,shuchu<key,value>leixing
job.setMapOutputValueClass(IntWritable.class);

// =============shuffle========================
// 1.partitioner
job.setPartitionerClass(FirstPartitioner.class);
// 2.sort
// job.setSortComparatorClass(cls);
// 3.combin
// job.setCombinerClass(WebReducer.class);
// 4.compress
// set by configuration
// 5.group
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// ==============shuffle=======================
// step 3.3:reducer
job.setReducerClass(SecondaryReducer.class);// zhiding,reduce,shuchu<keyK,value>,leixing
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// set reduce num
job.setNumReduceTasks(3);

// step 3.4:output
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0 : 1;
}

// main
public static void main(String[] args) throws Exception {
/*
* args = new String[] {
* "hdfs://bigdata00.hadoop-hao.com:8020/data/inputFiles/input02",
* "hdfs://bigdata00.hadoop-hao.com:8020/data/outputFiles/output04" };
*/
// create configuration
Configuration configuration = new Configuration();
// run job
int status = ToolRunner.run(configuration,
new SecondarySortReduceModel(), args);
// exit program
System.exit(status);
}
}


//———————自定义输出类———————–

public class SkeyWritable implements WritableComparable<SkeyWritable> {
private String first;
private int second;

public SkeyWritable() {
}

public void set(String first, int second) {
this.first = first;
this.second = second;
}

public String getFirst() {
return first;
}

public void setFirst(String first) {
this.first = first;
}

public int getSecond() {
return second +Integer.MAX_VALUE;
}

public void setSecond(int second) {
this.second = second-Integer.MAX_VALUE;
}

public SkeyWritable(String first, int second) {
this.set(first, second);
}

// =====
public void write(DataOutput out) throws IOException {

out.writeUTF(first);
out.writeInt(second);

}

public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}

public int compareTo(SkeyWritable o) {

int comp = this.first.compareTo(o.getFirst());
if (0 != comp) {
return comp;
}
return Integer.valueOf(getSecond()).compareTo(
Integer.valueOf(o.getSecond()));
}

}


MapReduce join

map 端 join

适用场景:假如有一大一小两个数据表的时候,可以使用map端的join

思路:在map端,将小表数据加载到内存中,获取key值,再加载大表数据,根据key的值,将小表数据读出,合并到大表中,关联整合。。

reduce 端 join

适用场景:当有两个相当的大表数据,数据较多,可以在reduce端使用join将两表关联整合。。

半连接join

适用场景:当有三个表的时候,可以使用半连接join

思路:按照map端的join,先将两个表在map关联整合,然后再reduce端与map整合的数据进行关联;

mapreduce join代码案例

package com.hao.bigdata.hadoop.mapreduce.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinMapReduce extends Configured implements Tool {

// maper classs
/***
* @author hao public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class JoinMapper extends
Mapper<LongWritable, Text, LongWritable, JoinWritable> { // extends-mapper-jilei

// set,map,output,value
private final static JoinWritable mapOutputValue = new JoinWritable();
private LongWritable mapOutputKey = new LongWritable();

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String[] strs = lineValue.split(",");
//获取数据长度
int length = strs.length;
//根据数据长度判断数据内容
if ((length != 3) && (length != 4)) {
return;
}
// 获取表中Cid并将之作为key
Long cid = Long.valueOf(strs[0]);
// set,output,key
mapOutputKey.set(cid);
// name
String name = String.valueOf(strs[1]);
if (3 == length) {
String phoneno = strs[2];
// 输出customer的value
mapOutputValue.set("customer", name + "," + phoneno);
}
if (4 == length) {
String price = strs[2];
String data = strs[3];
//输出order表数据
mapOutputValue.set("order", price + "," + data);
}
}
}

// reducer class
/**
* * @author hao public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*/
// TODO
public static class joinReducer extends
//设定key不输出,只输出value
Reducer<LongWritable, JoinWritable, NullWritable, Text> {
private Text outputValue = new Text();

@Override
public void reduce(LongWritable key, Iterable<JoinWritable> values,
Context context) throws IOException, InterruptedException {
// interator
String customerInfo = null;
//设定一个列历来存储order表的信息
List<String> orderList = new ArrayList<String>();
//循环获取数据
for (JoinWritable value : values) {
String tag = value.getTag();
if ("customer".equals(tag)) {
customerInfo = value.getData();
} else if ("order".equals(tag)) {
orderList.add(value.getData());
}
}
// 根据数据长度对数据进行判断
if (0 == orderList.size()) {
return;
}
// output value
//输出value信息
for (String order : orderList) {
outputValue.set(key.toString() + "," + customerInfo + ","
+ order);
}
context.write(NullWritable.get(), outputValue);
}
}

// driver
public int run(String args[]) throws Exception {
// step 1: get Configuration
Configuration configuration = super.getConf();

// step 2: creat Job chuanlian input-> map->reduce->output
Job job = Job.getInstance(configuration, this.getClass()
.getSimpleName());
job.setJarByClass(this.getClass()); // jar bao

/**
* step 3:job input ->map ->reduce ->output
*/
// step 3.1:input
Path inpath = new Path(args[0]); // fengzhuang lujing
FileInputFormat.addInputPath(job, inpath);
// step 3.2:mapper
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(LongWritable.class); // zhiding,map,shuchu<key,value>leixing
job.setMapOutputValueClass(JoinWritable.class);
// =============shuffle========================
// 1.partitioner
// job.setPartitionerClass(cls);
// 2.sort
// job.setSortComparatorClass(cls);
// 3.combin
//job.setCombinerClass(WebReducer.class);
// 4.compress
// set by configuration
// 5.group
// job.setGroupingComparatorClass(cls);
// ==============shuffle=======================

// step 3.3:reducer
job.setReducerClass(joinReducer.class);// zhiding,reduce,shuchu<keyK,value>,leixing
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
/*
* //set reduce num job.setNumReduceTasks(0);
*/
// step 3.4:output
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0 : 1;
}

// main
public static void main(String[] args) throws Exception {
/*
* args = new String[] {
* "hdfs://bigdata00.hadoop-hao.com:8020/data/inputFiles/input02",
* "hdfs://bigdata00.hadoop-hao.com:8020/data/outputFiles/output04" };
*/
// create configuration
Configuration configuration = new Configuration();
// run job
int status = ToolRunner.run(configuration, new JoinMapReduce(), args);
// exit program
System.exit(status);
}
}


//———————–自定义输出类型———————

package com.hao.bigdata.hadoop.mapreduce.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class JoinWritable implements Writable {
// mark customer/order
private String tag;
private String data;

public JoinWritable() {
}

public void set(String tag, String data) {
this.setTag(tag);
this.setData(data);
}

public JoinWritable(String tag, String data) {
this.set(tag, data);
}

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}

public String getData() {
return data;
}

public void setData(String data) {
this.data = data;
}

public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(this.getTag());
out.writeUTF(this.getData());

}

public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.setTag(in.readUTF());
this.setData(in.readUTF());

}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((data == null) ? 0 : data.hashCode());
result = prime * result + ((tag == null) ? 0 : tag.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
JoinWritable other = (JoinWritable) obj;
if (data == null) {
if (other.data != null)
return false;
} else if (!data.equals(other.data))
return false;
if (tag == null) {
if (other.tag != null)
return false;
} else if (!tag.equals(other.tag))
return false;
return true;
}
@Override
public String toString() {
return "tag"+"," + "data";
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: