您的位置:首页 > 其它

MapReduce实现排序功能

2014-06-16 13:38 260 查看
期间遇到了无法转value的值为int型,我採用try catch解决

str2 2

str1 1

str3 3

str1 4

str4 7

str2 5

str3 9

用的\t隔开,得到结果

str1 1,4

str2 2,5

str3 3,9

str4 7

我这里map,reduce都是单独出来的类,用了自己定义的key

package com.kane.mr;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

import com.j_spaces.obf.fi;

//str2 2

//str1 1

//str3 3

//str1 4

//str4 7

//str2 5

//str3 9

public class IntPair implements WritableComparable<IntPair>{

public String getFirstKey() {

return firstKey;

}

public void setFirstKey(String firstKey) {

this.firstKey = firstKey;

}

public int getSecondKey() {

return secondKey;

}

public void setSecondKey(int secondKey) {

this.secondKey = secondKey;

}

private String firstKey;//str1

private int secondKey;//1

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(firstKey);

out.writeInt(secondKey);

}

@Override

public void readFields(DataInput in) throws IOException {

firstKey=in.readUTF();

secondKey=in.readInt();

}

//这里做比較,还有一个是自身本类,对key进行排序

@Override

public int compareTo(IntPair o) {

// int first=o.getFirstKey().compareTo(this.firstKey);

// if (first!=0) {

// return first;

// }

// else {

// return o.getSecondKey()-this.secondKey;

// }

return o.getFirstKey().compareTo(this.getFirstKey());

}

}

package com.kane.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{

public IntPair intPair=new IntPair();

public IntWritable intWritable=new IntWritable(0);

@Override

protected void map(Object key, Text value,//str1 1

Context context)

throws IOException, InterruptedException {

//String[] values=value.toString().split("/t");

System.out.println(value);

int intValue;

try {

intValue = Integer.parseInt(value.toString());

} catch (NumberFormatException e) {

intValue=6;

}//不加try catch总是读取value时,无法转成int型

intPair.setFirstKey(key.toString());

intPair.setSecondKey(intValue);

intWritable.set(intValue);

context.write(intPair, intWritable);// key(str2 2) 2

}

}

package com.kane.mr;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{

@Override

protected void reduce(IntPair key, Iterable<IntWritable> values,

Context context)

throws IOException, InterruptedException {

StringBuffer combineValue=new StringBuffer();

Iterator<IntWritable> itr=values.iterator();

while (itr.hasNext()) {

int value=itr.next().get();

combineValue.append(value+",");

}

context.write(new Text(key.getFirstKey()),new Text(combineValue.toString()));

}

}

package com.kane.mr;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Partitioner;

public class PartionTest extends Partitioner<IntPair, IntWritable>{

@Override

public int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数

return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions);

}

}

package com.kane.mr;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class TextComparator extends WritableComparator{

public TextComparator(){

super(IntPair.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

IntPair o1=(IntPair)a;

IntPair o2=(IntPair)b;

return o1.getFirstKey().compareTo(o2.getFirstKey());

}

}

package com.kane.mr;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

@SuppressWarnings("rawtypes")

public class TextIntCompartor extends WritableComparator{

protected TextIntCompartor() {

super(IntPair.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

IntPair o1=(IntPair)a;

IntPair o2=(IntPair)b;

int first=o1.getFirstKey().compareTo(o2.getFirstKey());

if (first!=0) {

return first;

}

else {

return o1.getSecondKey()-o2.getSecondKey();

}

}

}

package com.kane.mr;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class SortMain {

public static void main(String[] args) throws Exception{

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "Sort");

job.setJarByClass(SortMain.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间\t隔开)value

job.setMapperClass(SortMapper.class);

//job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(SortReducer.class);

job.setMapOutputKeyClass(IntPair.class);

job.setMapOutputValueClass(IntWritable.class);

job.setSortComparatorClass(TextIntCompartor.class);

job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by

job.setPartitionerClass(PartionTest.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入參数,相应hadoop jar 相应类执行时在后面加的第一个參数

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出參数

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

导出jar包放到hadoop下,然后讲sort.txt放入到hdfs中,然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令运行

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: