您的位置:首页 > 编程语言 > Java开发

java编写的hadoop wordcount,单MR任务实现按照词频排序输出结果

2016-10-11 11:18 776 查看
由于之前写MR任务都是采用Streamming方式,以python语言编写,因此对于整个MR的过程细节要求不高,也不需要理解。但是java作为hadoop的原生语言,无论是性能效率、规范性、输出工具的易用性和完整性上,都是python无法比拟的,因此学习如何采用java进行编写MR任务。

第一个WordCount任务就遇到了麻烦,单纯的进行词频统计是非常简单的,但是如果要将最后的结果按照频次排序倒序输出,就比较麻烦了。查阅资料发现解决方案是,再写一个MR任务,利用hadoop自带的key-value互换Map(Invert),进行键值对互换,再根据hadoop的map阶段根据键值排序的机制,进行排序,当然,要修改比较器为倒序排序。这样的方法是可行的,但是需要2个MR任务,虽然中间结果可以使用Sequcen方式存储与读取,但还是略微繁杂,因此考虑以将数据全部读入一个reducer,处理时利用hashtable进行存储,在reducer清理阶段,进行排序,拿到FileSystem将结果写入HDFS。

整体上看,这个程序是很简洁的,但是也有缺点,就是在数据量非常大的时候,1个reducer进行排序会对内存和cpu造成很大的压力。

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.*;

/**
* Created by yuanye8 on 2016/10/10.
*/
public class WordCountExample {

public static class WordCountMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
private Text key = new Text();
private IntWritable value = new IntWritable();

public void configure(JobConf jobConf) {

}

public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
String[] words = text.toString().trim().split(" ");
key = new Text();
for (String word : words) {
key.set(word);
value.set(1);
outputCollector.collect(key, value);
}
}

public void close() throws IOException {

}
}

public static class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable value = new IntWritable();
private Hashtable<String, Integer> hashtable = new Hashtable<String, Integer>();
private String output_path = null;

public void configure(JobConf jobConf) {
this.output_path = jobConf.get("output_path");
}

public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (iterator.hasNext()) {
int tmp = iterator.next().get();
sum += tmp;
}
value.set(sum);
hashtable.put(text.toString(), sum);
outputCollector.collect(text, value);

}

public void close() throws IOException {
outputSortResult();
}

public void outputSortResult() {
FileSystem hdfs = null;
BufferedWriter bw = null;
try {
hdfs = FileSystem.get(new JobConf());
bw = new BufferedWriter(
new OutputStreamWriter(
hdfs.create(new Path(output_path + "_sort"), true)));

Set<Map.Entry<String, Integer>> set = this.hashtable.entrySet();
Map.Entry[] entries = set.toArray(new Map.Entry[set.size()]);
Arrays.sort(entries, new Comparator<Map.Entry>() {
public int compare(Map.Entry o1, Map.Entry o2) {
int v1 = (Integer) o1.getValue();
int v2 = (Integer) o2.getValue();
return v2 - v1;
}
});

for (Map.Entry<String, Integer> entry : entries) {
Strin
4000
g key = entry.getKey();
int value = entry.getValue();
bw.write(key + "\t" + value + "\n");
}
bw.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
FileTool.close(bw);
}
}
}

public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage : WordCountExample <input> <output>");
System.exit(-1);
}
Path input_path = new Path(args[0]);
Path output_path = new Path(args[1]);

JobConf conf = new JobConf();
conf.set("output_path", args[1]);
conf.setJobName("WordCount");
conf.setJarByClass(WordCountExample.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(WordCountReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(conf, input_path);
FileOutputFormat.setOutputPath(conf, output_path);
conf.setCombinerClass(WordCountReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);

RunningJob runningJob = JobClient.runJob(conf);
runningJob.waitForCompletion();
}
}


最后,记录一个未解决的疑问:

在关闭打开文件读取流时 FileTool.close(bw);

原来是FileTool.close(bw,hdfs);

但是会报错,报错内容为map阶段报 FileSystem Closed这个错误。

可我关闭是在reduce的清理阶段,即close()时进行的,不应该报这个错误。

那么,我猜想,是reduce清理阶段关闭文件系统时, 整个任务还未结束, map阶段可能也需要再次使用文件系统进行一些操作。

阅读其他源码, 对于主程序里打开的文件系统 FileSystem , 最后也没有关闭的操作,难道MR任务完成后会自动关闭文件系统么?不需要程序员自动关闭吗?

如有大神清晰理解我的问题,请评论留言指教,谢谢!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop java python
相关文章推荐