您的位置:首页 > 其它

MapReduce实现Distributed by and sort by

2014-11-11 20:32 246 查看
1. 用MR实现hive的Distributed by and sort by使用,如:select * from A distributed by a, b sort by c意 思就是根据a,b两个字段group,然后再按照c进行排序。

2. 实现方式比较简单,a和b做key输送到reduce,然后c作为value,到reduce端处理的时候用快排进行排序,代码如下:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 实现hive的distributed by and sort by, 如: select * from A distributed by a, b
* sort by c;
*
* @author jthink
*
*/
public class DisSort {

private static final String SEPARATOR = ",";

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length < 2) {
System.out.println("参数数量不对,至少两个以上参数:<数据文件输出路径>、<输入路径...>");
System.exit(1);
}
String dataOutput = args[0];
String[] inputs = new String[args.length - 1];
System.arraycopy(args, 1, inputs, 0, inputs.length);

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "dis sort 测试");
job.setJarByClass(DisSort.class);
job.setMapperClass(DisSortMapper.class);
job.setReducerClass(DisSortReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

Path[] inputPathes = new Path[inputs.length];
for (int i = 0; i < inputs.length; i++) {
inputPathes[i] = new Path(inputs[i]);
}
Path outputPath = new Path(dataOutput);
FileInputFormat.setInputPaths(job, inputPathes);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}

static class DisSortMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(SEPARATOR);
// get the field of a, b, c
String a = values[0], b = values[1], c = values[2];
context.write(new Text(a + b), new IntWritable(Integer.parseInt(c)));
}
}

static class DisSortReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
// sort the values
// Integer[] list = new Integer[values.iterator().
List<Integer> vs = new ArrayList<Integer>();
for (IntWritable value : values) {
vs.add(value.get());
}
Integer[] list = new Integer[vs.size()];
vs.toArray(list);
// 快排
sort(list, 0, list.length - 1);

for (int i = 0; i < list.length; ++i) {
context.write(key, new IntWritable(list[i]));
}
}

private void sort(Integer[] list, int low, int high) {
if (low < high) {
int middle = getMiddle(list, low, high); // 将list数组进行一分为二
sort(list, low, middle - 1); // 对低字表进行递归排序
sort(list, middle + 1, high); // 对高字表进行递归排序
}
}

private int getMiddle(Integer[] list, int low, int high) {
int tmp = list[low]; // 数组的第一个作为中轴
while (low < high) {
while (low < high && list[high] > tmp) {
high--;
}
list[low] = list[high]; // 比中轴小的记录移到低端
while (low < high && list[low] < tmp) {
low++;
}
list[high] = list[low]; // 比中轴大的记录移到高端
}
list[low] = tmp; // 中轴记录到尾
return low; // 返回中轴的位置
}
}
}


输入数据是:

a,b,1

a,c,4

a,b,3

a,c,42

a,d,5434

f,d,43

a,c,14

c,a,90

a,b,98

输出是:

ad 5434ab
1

ab 3

ab 98

ac 4

ac 14

ac 42

ca 90

fd 43
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: