您的位置:首页 > 其它

二次排序Mapreduce --SecondSort<转>

2014-06-27 18:24 246 查看
输入数据:

20 21

50 51

50 52

50 53

50 512

50 522

50 53

530 54

40 511

20 53

20 522

60 56

60 57

统计第一列数据出现的次数,结果为:

------------------------------------------------

20 21 20 21

20 21 20 53

20 21 20 522

------------------------------------------------

40 511 40 511

------------------------------------------------

50 51 50 51

50 51 50 52

50 51 50 53

50 51 50 53

50 51 50 512

50 51 50 522

------------------------------------------------

60 56 60 56

60 56 60 57

------------------------------------------------

530 54 530 54

代码如下:

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.RawComparator;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Partitioner;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class SecondSort extends Configured implements Tool{

public static class IntPair implements WritableComparable<IntPair>{

int first;

int second;



public IntPair(){

this.first = 0;

this.second = 0;

}



public void setIntPair(int first,int second){

this.first = first;

this.second = second;

}

public int getFirst() {

return first;

}

public void setFirst(int first) {

this.first = first;

}

public int getSecond() {

return second;

}

public void setSecond(int second) {

this.second = second;

}

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.first);

out.writeInt(this.second);

}

@Override

public void readFields(DataInput in) throws IOException {

first = in.readInt();

second = in.readInt();

}

public String toString(){

return this.getFirst() + "\t" + this.getSecond();

}



@Override

public int compareTo(IntPair arg0) {

int cmp = this.first - arg0.getFirst();

if(cmp != 0)

return cmp;

return this.second - arg0.getSecond();

}



@Override

public boolean equals(Object o){

if(o instanceof IntPair){

IntPair r = (IntPair)o;

return this.first == r.getFirst() && this.second == r.getSecond();

} else{

return false;

}

}

}



public static class FirstPartitioner implements Partitioner<IntPair,Text>{

@Override

public void configure(JobConf job) {

// TODO Auto-generated method stub



}

@Override

public int getPartition(IntPair key, Text value, int numPartitions) {

// TODO Auto-generated method stub

return Math.abs(key.getFirst() * 127) % numPartitions;

}



}



public static class FirstGroupingComparator implements RawComparator<IntPair> {

@Override

public int compare(IntPair o1, IntPair o2) {



// int cmp = o1.getFirst() - o2.getFirst();

// if(cmp != 0)

// return cmp;

return o1.getFirst() - o2.getFirst();

}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

return WritableComparator.compareBytes(b1,

s1,

Integer.SIZE / 8,

b2,

s2,

Integer.SIZE / 8);

}

}



public static class SecondSortMapper extends MapReduceBase implements

Mapper<LongWritable,Text,IntPair,Text>{

IntPair ipKey = new IntPair();

Text tValue = new Text();



@Override

public void map(LongWritable key,

Text value,

OutputCollector<IntPair, Text> output,

Reporter reporter) throws IOException {

String [] arrValue = value.toString().split("\t");

ipKey.setFirst(Integer.parseInt(arrValue[0]));

ipKey.setSecond(Integer.parseInt(arrValue[1]));



tValue.set(value);

output.collect(ipKey, tValue);

}

}



public static class SecondSortReducer extends MapReduceBase implements

Reducer<IntPair,Text,Text,Text>{

Text tKey = new Text();

Text tValue = new Text();

Text SEPARATOR = new Text("------------------------------------------------");



@Override

public void reduce(IntPair key,

Iterator<Text> values,

OutputCollector<Text, Text> output,

Reporter reporter) throws IOException {

tKey.set(key.toString());

output.collect(SEPARATOR, null);

while(values.hasNext()){

tValue.set(values.next());

output.collect(tKey, tValue);

}

}

}



@Override

public int run(String[] args) throws Exception {

Configuration conf = getConf();

JobConf jobConf = new JobConf(conf, SecondSort.class);

jobConf.setJobName("SecondSort");

jobConf.setMapOutputKeyClass(IntPair.class);

jobConf.setMapOutputValueClass(Text.class);

jobConf.setOutputKeyClass(Text.class);

jobConf.setOutputValueClass(Text.class);

jobConf.setPartitionerClass(FirstPartitioner.class);

jobConf.setOutputValueGroupingComparator(FirstGroupingComparator.class);

jobConf.setMapperClass(SecondSortMapper.class);

jobConf.setReducerClass(SecondSortReducer.class);

jobConf.setInputFormat(TextInputFormat.class);

jobConf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(jobConf, new Path(args[0]));

FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

JobClient.runJob(jobConf);

return 0;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new SecondSort(), args);

System.exit(exitCode);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: