java编写的hadoop wordcount,单MR任务实现按照词频排序输出结果
2016-10-11 11:18
776 查看
由于之前写MR任务都是采用Streamming方式,以python语言编写,因此对于整个MR的过程细节要求不高,也不需要理解。但是java作为hadoop的原生语言,无论是性能效率、规范性、输出工具的易用性和完整性上,都是python无法比拟的,因此学习如何采用java进行编写MR任务。
第一个WordCount任务就遇到了麻烦,单纯的进行词频统计是非常简单的,但是如果要将最后的结果按照频次排序倒序输出,就比较麻烦了。查阅资料发现解决方案是,再写一个MR任务,利用hadoop自带的key-value互换Map(Invert),进行键值对互换,再根据hadoop的map阶段根据键值排序的机制,进行排序,当然,要修改比较器为倒序排序。这样的方法是可行的,但是需要2个MR任务,虽然中间结果可以使用Sequcen方式存储与读取,但还是略微繁杂,因此考虑以将数据全部读入一个reducer,处理时利用hashtable进行存储,在reducer清理阶段,进行排序,拿到FileSystem将结果写入HDFS。
整体上看,这个程序是很简洁的,但是也有缺点,就是在数据量非常大的时候,1个reducer进行排序会对内存和cpu造成很大的压力。
最后,记录一个未解决的疑问:
在关闭打开文件读取流时 FileTool.close(bw);
原来是FileTool.close(bw,hdfs);
但是会报错,报错内容为map阶段报 FileSystem Closed这个错误。
可我关闭是在reduce的清理阶段,即close()时进行的,不应该报这个错误。
那么,我猜想,是reduce清理阶段关闭文件系统时, 整个任务还未结束, map阶段可能也需要再次使用文件系统进行一些操作。
阅读其他源码, 对于主程序里打开的文件系统 FileSystem , 最后也没有关闭的操作,难道MR任务完成后会自动关闭文件系统么?不需要程序员自动关闭吗?
如有大神清晰理解我的问题,请评论留言指教,谢谢!
第一个WordCount任务就遇到了麻烦,单纯的进行词频统计是非常简单的,但是如果要将最后的结果按照频次排序倒序输出,就比较麻烦了。查阅资料发现解决方案是,再写一个MR任务,利用hadoop自带的key-value互换Map(Invert),进行键值对互换,再根据hadoop的map阶段根据键值排序的机制,进行排序,当然,要修改比较器为倒序排序。这样的方法是可行的,但是需要2个MR任务,虽然中间结果可以使用Sequcen方式存储与读取,但还是略微繁杂,因此考虑以将数据全部读入一个reducer,处理时利用hashtable进行存储,在reducer清理阶段,进行排序,拿到FileSystem将结果写入HDFS。
整体上看,这个程序是很简洁的,但是也有缺点,就是在数据量非常大的时候,1个reducer进行排序会对内存和cpu造成很大的压力。
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.*; /** * Created by yuanye8 on 2016/10/10. */ public class WordCountExample { public static class WordCountMapper implements Mapper<LongWritable, Text, Text, IntWritable> { private Text key = new Text(); private IntWritable value = new IntWritable(); public void configure(JobConf jobConf) { } public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { String[] words = text.toString().trim().split(" "); key = new Text(); for (String word : words) { key.set(word); value.set(1); outputCollector.collect(key, value); } } public void close() throws IOException { } } public static class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable value = new IntWritable(); private Hashtable<String, Integer> hashtable = new Hashtable<String, Integer>(); private String output_path = null; public void configure(JobConf jobConf) { this.output_path = jobConf.get("output_path"); } public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { int sum = 0; while (iterator.hasNext()) { int tmp = iterator.next().get(); sum += tmp; } value.set(sum); hashtable.put(text.toString(), sum); outputCollector.collect(text, value); } public void close() throws IOException { outputSortResult(); } public void outputSortResult() { FileSystem hdfs = null; BufferedWriter bw = null; try { hdfs = FileSystem.get(new JobConf()); bw = new BufferedWriter( new OutputStreamWriter( hdfs.create(new Path(output_path + "_sort"), true))); Set<Map.Entry<String, Integer>> set = this.hashtable.entrySet(); Map.Entry[] entries = set.toArray(new Map.Entry[set.size()]); Arrays.sort(entries, new Comparator<Map.Entry>() { public int compare(Map.Entry o1, Map.Entry o2) { int v1 = (Integer) o1.getValue(); int v2 = (Integer) o2.getValue(); return v2 - v1; } }); for (Map.Entry<String, Integer> entry : entries) { Strin 4000 g key = entry.getKey(); int value = entry.getValue(); bw.write(key + "\t" + value + "\n"); } bw.flush(); } catch (IOException e) { e.printStackTrace(); } finally { FileTool.close(bw); } } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage : WordCountExample <input> <output>"); System.exit(-1); } Path input_path = new Path(args[0]); Path output_path = new Path(args[1]); JobConf conf = new JobConf(); conf.set("output_path", args[1]); conf.setJobName("WordCount"); conf.setJarByClass(WordCountExample.class); conf.setMapperClass(WordCountMapper.class); conf.setReducerClass(WordCountReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); TextInputFormat.addInputPath(conf, input_path); FileOutputFormat.setOutputPath(conf, output_path); conf.setCombinerClass(WordCountReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1); RunningJob runningJob = JobClient.runJob(conf); runningJob.waitForCompletion(); } }
最后,记录一个未解决的疑问:
在关闭打开文件读取流时 FileTool.close(bw);
原来是FileTool.close(bw,hdfs);
但是会报错,报错内容为map阶段报 FileSystem Closed这个错误。
可我关闭是在reduce的清理阶段,即close()时进行的,不应该报这个错误。
那么,我猜想,是reduce清理阶段关闭文件系统时, 整个任务还未结束, map阶段可能也需要再次使用文件系统进行一些操作。
阅读其他源码, 对于主程序里打开的文件系统 FileSystem , 最后也没有关闭的操作,难道MR任务完成后会自动关闭文件系统么?不需要程序员自动关闭吗?
如有大神清晰理解我的问题,请评论留言指教,谢谢!
相关文章推荐
- java8实现spark wordcount并且按照value排序输出
- spark helloworld (wordCount实现并按照词频排序)
- spark版WordCount(Java),将输出结果排序,并去除输出文件中的括号。
- MapReduce-WordCount实现按照value降序排序、字符小写、识别不同标点
- hadoop的WordCount按照value降序排序
- 插入排序(Java实现)---从控制台输入不定长数组,并输出排序结果
- 编写一个多线程函数实现对数组排序,要求: 1.至少用两个线程 2.数组的元素值可以事先定义好,或者可以从键盘输入(增加一个线程)。 3.用一个线程对数组排序,用另一个线程输出排序结果。 4.保证先排好序,再输出。
- Hadoop WordCount改进实现正确识别单词以及词频降序排序
- 【Hadoop】编写和运行WordCount.java
- Java实现Hadoop下词配对Wordcount计数
- 编写一个void sort(int*x,int n)实现将x数组中的n个数据从大到小排序。n及数组元素在主函数中输入。将结果显示在屏幕上并输出到文件
- hadoop中文wordcount无结果输出
- Java实现词频统计(Wordcount)-Map或Hashtable的value排序
- java中实现HashMap中的按照key的字典顺序排序输出
- java中实现HashMap中的按照key的字典顺序排序输出
- java中实现HashMap中的按照key的字典顺序排序输出
- Hadoop WordCount改进实现正确识别单词以及词频降序排序
- 编写一个void sort(int*x,int n)实现将x数组中的n个数据从大到小排序。n及数组元素在主函数中输入。将结果显示在屏幕上并输出到文件
- java中给map按照值value排序输出,用Comparator实现
- 解决Eclipse中运行WordCount出现 java.lang.ClassNotFoundException: org.apache.hadoop.examples.WordCount$TokenizerMapper问题【转】