MapReduce/Hadoop的TopN解决方案之键不唯一的情况
2017-11-02 16:45
176 查看
一、MapReduce/Hadoop的TopN解决方案之键唯一的情况(点击打开链接)
二、针对键不唯一的情况,即文件中可能出现多次关键字解决办法:先讲不唯一键转换为唯一键,即使用MapReduce合并键相同的项,再使用(一)所述的唯一键TopN方案
即
package topN_hadoop1; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; public class AggregateByKeyMapper extends Mapper<Object, Text, Text, IntWritable> { private Text K2 = new Text(); private IntWritable V2 = new IntWritable(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String valueAsString = value.toString().trim(); String[] tokens = valueAsString.split(","); if (tokens.length != 2) { return; } String url = tokens[0]; int frequency = Integer.parseInt(tokens[1]); K2.set(url); V2.set(frequency); context.write(K2, V2); } }
package topN_hadoop1; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; public class AggregateByKeyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } }
package topN_hadoop1; import org.apache.log4j.Logger; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AggregateByKeyDriver extends Configured implements Tool { private static Logger THE_LOGGER = Logger.getLogger(AggregateByKeyDriver.class); public int run(String[] args) throws Exception { Job job = new Job(getConf()); HadoopUtil.addJarsToDistributedCache(job, "/lib/"); job.setJobName("AggregateByKeyDriver"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(AggregateByKeyMapper.class); job.setReducerClass(AggregateByKeyReducer.class); job.setCombinerClass(AggregateByKeyReducer.class); // args[0] = input directory // args[1] = output directory FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean status = job.waitForCompletion(true); THE_LOGGER.info("run(): status="+status); return status ? 0 : 1; } /** * The main driver for "Aggregate By Key" program. * Invoke this method to submit the map/reduce job. * @throws Exception When there is communication problems with the job tracker. */ public static void main(String[] args) throws Exception { // Make sure there are exactly 2 parameters if (args.length != 2) { THE_LOGGER.warn("usage AggregateByKeyDriver <input> <output>"); System.exit(1); } THE_LOGGER.info("inputDir="+args[0]); THE_LOGGER.info("outputDir="+args[1]); int returnStatus = ToolRunner.run(new AggregateByKeyDriver(), args); System.exit(returnStatus); } }
相关文章推荐
- MapReduce/Hadoop的TopN解决方案之键唯一的情况
- Spark的TopN解决方案(键唯一的情况、键不唯一的情况)
- Hadoop启动异常情况解决方案
- 求高并发情况下生成唯一订单号解决方案
- Hadoop启动异常情况整理与解决方案01(能力工场--小马哥整理)
- Hadoop启动异常情况整理与解决方案01(能力工场--小马哥整理)
- 分布式情况下生成数据库唯一ID的解决方案
- MapReduce/Hadoop的二次排序解决方案
- 分布式情况下生成数据库唯一ID的解决方案
- hadoop环境配置过程中可能遇到问题的解决方案
- 蠕虫“艾妮”情况分析和解决方案
- 各hbase版本对hadoop各版本的支持情况
- PHP 中文乱码的问题解决方案,可解决80%的情况。
- hadoop一些常见报错的解决方案
- TextView跑步灯效果及在特殊情况下无效的解决方案
- JavaScript进阶系列04,函数参数个数不确定情况下的解决方案
- 关于内层DIV设置margin-top不起作用的解决方案
- 在windows下使用adb工具查看cpu消耗情况解决方案
- hadoop环境配置过程中可能遇到问题的解决方案
- 如何在表中的唯一/主键被外部关键字引用的情况下drop表