您的位置:首页 > 运维架构

MapReduce/Hadoop的TopN解决方案之键不唯一的情况

2017-11-02 16:45 176 查看


一、MapReduce/Hadoop的TopN解决方案之键唯一的情况(点击打开链接

二、针对键不唯一的情况,即文件中可能出现多次关键字

解决办法:先讲不唯一键转换为唯一键,即使用MapReduce合并键相同的项,再使用(一)所述的唯一键TopN方案


package topN_hadoop1;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class AggregateByKeyMapper extends
Mapper<Object, Text, Text, IntWritable> {
private Text K2 = new Text();
private IntWritable V2 = new IntWritable();

@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
if (tokens.length != 2) {
return;
}
String url = tokens[0];
int frequency =  Integer.parseInt(tokens[1]);
K2.set(url);
V2.set(frequency);
context.write(K2, V2);
}
}


package topN_hadoop1;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class AggregateByKeyReducer  extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}


package topN_hadoop1;

import org.apache.log4j.Logger;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AggregateByKeyDriver  extends Configured implements Tool {

private static Logger THE_LOGGER = Logger.getLogger(AggregateByKeyDriver.class);

public int run(String[] args) throws Exception {
Job job = new Job(getConf());
HadoopUtil.addJarsToDistributedCache(job, "/lib/");
job.setJobName("AggregateByKeyDriver");

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(AggregateByKeyMapper.class);
job.setReducerClass(AggregateByKeyReducer.class);
job.setCombinerClass(AggregateByKeyReducer.class);

// args[0] = input directory
// args[1] = output directory
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean status = job.waitForCompletion(true);
THE_LOGGER.info("run(): status="+status);
return status ? 0 : 1;
}

/**
* The main driver for "Aggregate By Key" program.
* Invoke this method to submit the map/reduce job.
* @throws Exception When there is communication problems with the job tracker.
*/
public static void main(String[] args) throws Exception {
// Make sure there are exactly 2 parameters
if (args.length != 2) {
THE_LOGGER.warn("usage AggregateByKeyDriver <input> <output>");
System.exit(1);
}

THE_LOGGER.info("inputDir="+args[0]);
THE_LOGGER.info("outputDir="+args[1]);
int returnStatus = ToolRunner.run(new AggregateByKeyDriver(), args);
System.exit(returnStatus);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息