您的位置:首页 > 运维架构

Hadoop自定义排序和自定义数据类型使用(setSortComparatorClass和setGroupingComparatorClass)

2014-03-31 20:33 501 查看
1    Mapper

public class SortMapper extends Mapper<Object, Text, TextInt, IntWritable>{

public TextInt textInt = new TextInt();
public IntWritable intp = new IntWritable(0);

@Override
protected void map(Object key, Text value,
Context context)
throws IOException, InterruptedException {
int i =  Integer.parseInt(value.toString());

textInt.setStr(key.toString());
textInt.setValue(i);
intp.set(i);
context.write(textInt,intp);
}

}

2   Partitioner

public class SortPartitioner extends Partitioner<TextInt, IntWritable>{

@Override
public int getPartition(TextInt textInt, IntWritable value, int numReducers) {
return textInt.getStr().hashCode() & Integer.MAX_VALUE % numReducers;
}
}

3    Reducer

public class SortReducer extends Reducer<TextInt, IntWritable, Text, Text>{

@Override
protected void reduce(TextInt textInt, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {

StringBuffer stringCombine = new StringBuffer();
Iterator<IntWritable> itr = values.iterator();
while(itr.hasNext())
{
int value = itr.next().get();
stringCombine.append(value + ",");
}
int length = stringCombine.length();
if(length > 0)
stringCombine.deleteCharAt(length - 1);
context.write(new Text(textInt.getStr()), new Text(stringCombine.toString()));
}
}

4    自定义数据类型TextInt

public class TextInt implements WritableComparable<TextInt> {

private String str;
private int value;

public String getStr() {
return str;
}
public void setStr(String str) {
this.str = str;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public void readFields(DataInput in) throws IOException {
str = in.readUTF();
value = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(str);
out.writeInt(value);
}
@Override
public int compareTo(TextInt o) {
return o.getStr().compareTo(this.getStr());
}

}

6    自定义的Mapper端的排序比较类

public class TextIntComparator extends WritableComparator{

public TextIntComparator(){
super(TextInt.class, true);
}

@Override
@SuppressWarnings("all")
public int compare(WritableComparable a, WritableComparable b) {
TextInt o1 = (TextInt) a;
TextInt o2 = (TextInt) b;

if(! o1.getStr().equals(o2.getStr()))
return o1.getStr().compareTo(o2.getStr());
else
return o1.getValue() - o2.getValue();
}
}

7    自定义的Reducer端的排序比较类

public class TextComparator extends WritableComparator{
public TextComparator(){
super(TextInt.class, true);
}

@Override
@SuppressWarnings("all")

public int compare(WritableComparable a, WritableComparable b) {
TextInt o1 = (TextInt) a;
TextInt o2 = (TextInt) b;
return o1.getStr().compareTo(o2.getStr());
}
}


8    驱动程序

public class SortMain  {
public static void main(String[] args) throws IOException{

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=2 )
{
System.err.println("Usage:sort <int><out>");
System.exit(2);
}
Job job = new Job(conf,"sort");
job.setJarByClass(SortMain.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass(SortMapper.class);
job.setPartitionerClass(SortPartitioner.class);
job.setMapOutputKeyClass(TextInt.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(TextIntComparator.class);
job.setGroupingComparatorClass(TextComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
try {
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}

9    运行和效果





注意:必须得在驱动程序中设置setMapperOutputKey和setMApperOutputValue,默认的是mapper输出value和key类型是Text和Text。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop