您的位置:首页 > 其它

MapReduce功能实现二---排序

2017-07-25 17:56 405 查看
MapReduce功能实现系列:

MapReduce功能实现一---Hbase和Hdfs之间数据相互转换

MapReduce功能实现二---排序

MapReduce功能实现三---Top N

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

MapReduce功能实现五---去重(Distinct)、计数(Count)

MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能实现七---小综合(多个job串行处理计算平均值)

MapReduce功能实现八---分区(Partition)

MapReduce功能实现九---Pv、Uv

MapReduce功能实现十---倒排索引(Inverted Index)

MapReduce功能实现十一---join

情况1:

[hadoop@h71 q1]$ vi ip.txt

192.168.1.1 aaa

192.168.1.1 aaa

192.168.1.1 aaa

192.168.1.1 aaa

192.168.1.1 aaa

192.168.1.1 aaa

192.168.1.1 aaa

192.168.2.2 ccc

192.168.3.3 ddd

192.168.3.3 ddd

192.168.3.3 ddd

192.168.5.5 fff

[hadoop@h71 q1]$ hadoop fs -put ip.txt /input

java代码(将IP统计并升序输出):

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class IpTopK {

public static class IpTopKMapper1 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text>
outputCollector, Reporter reporter) throws IOException {
String ip = text.toString().split(" ", 5)[0];
outputCollector.collect(new Text(ip), new Text("1"));
}
}

public static class IpTopKReducer1 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> iterator, OutputCollector<Text, Text>
outputCollector, Reporter reporter) throws IOException {
long sum = 0;
while(iterator.hasNext()){
sum = sum + Long.parseLong(iterator.next().toString());
}
outputCollector.collect(new Text(key), new Text(String.valueOf(sum)));
/**
* ip1 count
* ip2 count
* ip3 count
*/
}
}

public static class IpTopKMapper2 extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable longWritable, Text text, OutputCollector<LongWritable, Text>
outputCollector, Reporter reporter) throws IOException {
String [] ks = text.toString().split("\t");
/**
* ks[0] , ip
* ks[1], count
*/
outputCollector.collect(new LongWritable(Long.parseLong(ks[1])), new Text(ks[0]));
}
}

public static class IpTopKReducer2 extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> {
@Override
public void reduce(LongWritable key, Iterator<Text> iterator, OutputCollector<LongWritable, Text>
outputCollector, Reporter reporter) throws IOException {

while(iterator.hasNext()){
outputCollector.collect(key, iterator.next());
}
}
}

public static void main(String [] args) throws IOException {
System.out.println(args.length);
if(args.length < 2){
System.out.println("args not right!");
return ;
}

JobConf conf = new JobConf(IpTopK.class);
conf.set("mapred.jar","tt.jar");
//set output key class
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);

//set mapper & reducer class
conf.setMapperClass(IpTopKMapper1.class);
conf.setCombinerClass(IpTopKReducer1.class);
conf.setReducerClass(IpTopKReducer1.class);

// set format
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

String inputDir = args[0];
String outputDir = args[1];

// FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/");
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));

boolean flag = JobClient.runJob(conf).isSuccessful();

if(flag){
System.out.println("run job-1 successful");
JobConf conf1 = new JobConf(IpTopK.class);
conf1.set("mapred.jar","tt.jar");
//set output key class
conf1.setOutputKeyClass(LongWritable.class);
conf1.setOutputValueClass(Text.class);
//set mapper & reducer class
conf1.setMapperClass(IpTopKMapper2.class);
conf1.setReducerClass(IpTopKReducer2.class);
// set format
conf1.setInputFormat(TextInputFormat.class);
conf1.setOutputFormat(TextOutputFormat.class);
conf1.setNumReduceTasks(1);

// FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/");
FileInputFormat.setInputPaths(conf1, outputDir);
FileOutputFormat.setOutputPath(conf1, new Path(outputDir + "-2"));
boolean flag1 = JobClient.runJob(conf1).isSuccessful();
if(flag1){
System.out.println("run job-2 successful !!");
}
}
}
}
注意:这个是hadoop1版本的代码,并且该代码执行了两个mapreduce任务

在Linux中执行该代码:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac IpTopK.java 

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf tt.jar IpTopK*class(这个tt.jar还必须得和代码中的相对应)

[hadoop@h71 q1]$ hadoop jar tt.jar IpTopK /input/ip.txt /output

[hadoop@h71 q1]$ hadoop fs -lsr /output

-rw-r--r--   2 hadoop supergroup          0 2017-03-18 16:07 /output/_SUCCESS

-rw-r--r--   2 hadoop supergroup         56 2017-03-18 16:07 /output/part-00000

[hadoop@h71 q1]$ hadoop fs -lsr /output-2

-rw-r--r--   2 hadoop supergroup          0 2017-03-18 16:07 /output-2/_SUCCESS

-rw-r--r--   2 hadoop supergroup         56 2017-03-18 16:07 /output-2/part-00000

[hadoop@h71 q1]$ hadoop fs -cat /output/part-00000

192.168.1.1     7

192.168.2.2     1

192.168.3.3     3

192.168.5.5     1

[hadoop@h71 q1]$ hadoop fs -cat /output-2/part-00000

1       192.168.5.5

1       192.168.2.2

3       192.168.3.3

7       192.168.1.1

情况2:降序

[hadoop@h71 q1]$ vi test.txt

a 5

b 4

c 74

d 78

e 1

r 64

f 4

注意:分隔符/t(Tab键)或者空格都可以

[hadoop@h71 q1]$ hadoop fs -put test.txt /input

java代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JiangXu {

public static class SortIntValueMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable wordCount = new IntWritable(1);
private Text word = new Text();
public SortIntValueMapper() {
super();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

StringTokenizer tokenizer = new StringTokenizer(value.toString());
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().trim());
wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));
context.write(wordCount, word);
}
}
}

/**
* 按照key的大小来划分区间,当然,key是int值
*/
public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
/**
* int值的hashcode还是自己本身的数值
*/
//这里我认为大于maxValue的就应该在第一个分区
int maxValue = 50;
int keySection = 0;
// 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0
if (numReduceTasks > 1 && key.hashCode() < maxValue) {
int sectionValue = maxValue / (numReduceTasks - 1);
int count = 0;
while ((key.hashCode() - sectionValue * count) > sectionValue) {
count++;
}
keySection = numReduceTasks - 1 - count;
}
return keySection;
}
}

/**
* int的key按照降序排列
*/
public static class IntKeyDescComparator extends WritableComparator {
protected IntKeyDescComparator() {
super(IntWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
}

/**
* 把key和value颠倒过来输出
*/
public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
private Text result = new Text();
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
result.set(val.toString());
context.write(result, key);
}
}
}

public static void main(String [] args) throws Exception {
/**
* 这里是map输出的key和value类型
*/
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(JiangXu.class);
job.setMapperClass(SortIntValueMapper.class);
job.setSortComparatorClass(IntKeyDescComparator.class);
job.setPartitionerClass(KeySectionPartitioner.class);
job.setReducerClass(SortIntValueReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
/**
*这里可以放输入目录数组,也就是可以把上一个job所有的结果都放进去
**/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac JiangXu.java 

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar JiangXu*class

[hadoop@h71 q1]$ hadoop jar xx.jar JiangXu /input/test.txt /output

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000

d       78

c       74

r       64

a       5

f       4

b       4

e       1

情况3:改进型的WordCount(按词频倒排),官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能

来自:http://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html

原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.

这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:

[hadoop@h71 q1]$ vi test.txt

They say we are what we are

But we do not have to be

I am  bad behavior but I do it in the best way

I will be the watcher

Of the eternal flame

I will be the guard dog

of all your fever dreams

[hadoop@h71 q1]$ hadoop fs -put test.txt /input

java代码:

import java.io.IOException;
import java.util.Comparator;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount2 {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
@Override
public int compare(Integer x, Integer y) {
return y.compareTo(x);
}
});
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//reduce后的结果放入treeMap,而不是向context中记入结果
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
if (treeMap.containsKey(sum)){
String value = treeMap.get(sum) + "," + key.toString();
treeMap.put(sum,value);
}
else {
treeMap.put(sum, key.toString());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
//将treeMap中的结果,按value-key顺序写入contex中
for (Integer key : treeMap.keySet()) {
context.write(new Text(treeMap.get(key)), new IntWritable(key));
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount2 <in> [<in>...] <out>");
System.exit(2);
}

Job job = Job.getInstance(conf, "word count2");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac WordCount2.java 

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar WordCount2*class

[hadoop@h71 q1]$ hadoop jar xx.jar WordCount2 /input/test.txt /output

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000

I,the   4

be,we   3

are,do,will     2

But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your    1

情况4:自定义排序

对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列

如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。

数据格式:

[hadoop@h71 q1]$ vi haha.txt

7 3

7 5

7 1

5 9

5 6

1 7

Java代码:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ZiDingYi {
private static final String INPUT_PATH = "hdfs://h71:9000/in";
private static final String OUT_PATH = "hdfs://h71:9000/out";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
if(fileSystem.exists(new Path(OUT_PATH))){
fileSystem.delete(new Path(OUT_PATH),true);
}
Job job = new Job(conf,ZiDingYi.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setJarByClass(ZiDingYi.class);
//上面这行必须加,不然会报错:Caused by: java.lang.ClassNotFoundException: Class ZiDingYi$MyMapper not found

//指定哪个类用来格式化输入文件
job.setInputFormatClass(TextInputFormat.class);
//指定自定义的Mapper类
job.setMapperClass(MyMapper.class);
//指定输出<k2,v2>的类型
job.setMapOutputKeyClass(newK2.class);
job.setMapOutputValueClass(LongWritable.class);

//指定分区类
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);

//指定自定义的reduce类
job.setReducerClass(MyReducer.class);
//指定输出<k3,v3>的类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//设定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);

//把代码提交给JobTracker执行
job.waitForCompletion(true);
}

static class MyMapper extends Mapper<LongWritable,Text, newK2,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splied = value.toString().split(" ");
newK2 k2 = new newK2(Long.parseLong(splied[0]),Long.parseLong(splied[1]));
final LongWritable v2 = new LongWritable(Long.parseLong(splied[1]));
context.write(k2, v2);
}
}

static class MyReducer extends Reducer<newK2, LongWritable, LongWritable, LongWritable>{
@Override
protected void reduce(ZiDingYi.newK2 key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(key.first), new LongWritable(key.second));
}
}

static class newK2 implements WritableComparable<newK2>{
Long first;
Long second;
public newK2(long first, long second) {
this.first = first;
this.second = second;
}

public newK2() {
}  //这个方法还不能删掉,否则报错:Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: ZiDingYi$newK2.<init>()

@Override
public void readFields(DataInput input) throws IOException {
this.first = input.readLong();
this.second = input.readLong();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
//当第一列不同时,升序;当第一列相同时,第二列升序

@Override
public int compareTo(newK2 o) {
long temp = this.first -o.first;
if(temp!=0){
return (int)temp;
}
return (int)(this.second -o.second);
}

@Override
public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}

@Override
public boolean equals(Object obj) {
if(!(obj instanceof newK2)){
return false;
}
newK2 k2 = (newK2)obj;
return(this.first == k2.first)&&(this.second == k2.second);
}
}
}

注意:KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了

运行程序后查看结果:

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000

1       7

5       6

5       9

7       1

7       3

7       5
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: