MapReduce实现Distributed by and sort by
2014-11-11 20:32
246 查看
1. 用MR实现hive的Distributed by and sort by使用,如:select * from A distributed by a, b sort by c意 思就是根据a,b两个字段group,然后再按照c进行排序。
2. 实现方式比较简单,a和b做key输送到reduce,然后c作为value,到reduce端处理的时候用快排进行排序,代码如下:
输入数据是:
a,b,1
a,c,4
a,b,3
a,c,42
a,d,5434
f,d,43
a,c,14
c,a,90
a,b,98
输出是:
ad 5434ab
1
ab 3
ab 98
ac 4
ac 14
ac 42
ca 90
fd 43
2. 实现方式比较简单,a和b做key输送到reduce,然后c作为value,到reduce端处理的时候用快排进行排序,代码如下:
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 实现hive的distributed by and sort by, 如: select * from A distributed by a, b * sort by c; * * @author jthink * */ public class DisSort { private static final String SEPARATOR = ","; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length < 2) { System.out.println("参数数量不对,至少两个以上参数:<数据文件输出路径>、<输入路径...>"); System.exit(1); } String dataOutput = args[0]; String[] inputs = new String[args.length - 1]; System.arraycopy(args, 1, inputs, 0, inputs.length); Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "dis sort 测试"); job.setJarByClass(DisSort.class); job.setMapperClass(DisSortMapper.class); job.setReducerClass(DisSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path[] inputPathes = new Path[inputs.length]; for (int i = 0; i < inputs.length; i++) { inputPathes[i] = new Path(inputs[i]); } Path outputPath = new Path(dataOutput); FileInputFormat.setInputPaths(job, inputPathes); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } static class DisSortMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(SEPARATOR); // get the field of a, b, c String a = values[0], b = values[1], c = values[2]; context.write(new Text(a + b), new IntWritable(Integer.parseInt(c))); } } static class DisSortReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // sort the values // Integer[] list = new Integer[values.iterator(). List<Integer> vs = new ArrayList<Integer>(); for (IntWritable value : values) { vs.add(value.get()); } Integer[] list = new Integer[vs.size()]; vs.toArray(list); // 快排 sort(list, 0, list.length - 1); for (int i = 0; i < list.length; ++i) { context.write(key, new IntWritable(list[i])); } } private void sort(Integer[] list, int low, int high) { if (low < high) { int middle = getMiddle(list, low, high); // 将list数组进行一分为二 sort(list, low, middle - 1); // 对低字表进行递归排序 sort(list, middle + 1, high); // 对高字表进行递归排序 } } private int getMiddle(Integer[] list, int low, int high) { int tmp = list[low]; // 数组的第一个作为中轴 while (low < high) { while (low < high && list[high] > tmp) { high--; } list[low] = list[high]; // 比中轴小的记录移到低端 while (low < high && list[low] < tmp) { low++; } list[high] = list[low]; // 比中轴大的记录移到高端 } list[low] = tmp; // 中轴记录到尾 return low; // 返回中轴的位置 } } }
输入数据是:
a,b,1
a,c,4
a,b,3
a,c,42
a,d,5434
f,d,43
a,c,14
c,a,90
a,b,98
输出是:
ad 5434ab
1
ab 3
ab 98
ac 4
ac 14
ac 42
ca 90
fd 43
相关文章推荐
- IIS7 host WCF通过多播实现出版-预订(Build pub-sub with IIS7 Host WCF by msmq multicast and netmsmqintegrationbinding)
- Hadoop MapReduce Shuffle and Sort
- (周日赛)Little Pony and Sort by Shift
- Divide and Conquer.(Merge Sort) by sixleaves
- //使用hibernate,实现group by and sum and count
- scala sortBy and sortWith
- Pair Work:电梯调度算法的实现和测试 by 12061171 and 12061168
- Little Pony and Sort by Shift
- c++ map sort by value and sort by key(字典的遍历)
- mapreduce (四) MapReduce实现Grep+sort
- Codeforces Div. 2 #259-B. Little Pony and Sort by Shift
- Codeforces Round #259 (Div. 2) B. Little Pony and Sort by Shift(模拟)
- MapReduce Shuffle And Sort
- Codeforces Round #259 (Div. 2) B - Little Pony and Sort by Shift
- How to query_posts using meta_query to orderby meta_key AND have a secondary sort by date?
- Codeforces #259 (Div. 2) B. Little Pony and Sort by Shift
- CF 454 B. Little Pony and Sort by Shift
- Hadoop MapReduce Shuffle and Sort
- Codeforces Round #259 (Div. 2) B. Little Pony and Sort by Shift