您的位置:首页 > 产品设计 > UI/UE

MapReduce-WordCount实现按照value降序排序、字符小写、识别不同标点

2016-05-24 13:24 609 查看
本次是指导本科生实验,做一次简单的实验记录。

实验要求:

输入文件的按照空格、逗号、点号、双引号等分词

输入文件的大写字母全部换成小写

文件输出要求按照value值降序排序

Hadoop给的wordcount示例代码以及代码理解

基于map reduce的word count个人理解:输入的文件经过map reduce框架处理后,将文件分成几份,对于每份文件由独立的job来执行,针对每个job,输入的文件由map按行处理得到相应的输出,中间经过一次shuffle操作,最后经过reduce操作得到输出,输出是按照key的升序排列的。

输入文件Map操作shuffle操作reduce操作输出文件
Hello,Two<Hello,1><Hello,1,1><Hello,2><Hello,2>
<Two,1><Two,1><Two,1><one,1>
Hello one<Hello,1><one,1><one,1><Two,1>
<one,1>
源代码在Hadoop文件中,Hadoop下载地址:

http://mirror.bit.edu.cn/apache/hadoop/common/ share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.7.0-sources.jar org.apache.hadoop.examples.WordCount

wordcount示例代码以及改进

实现输入文件的按照空格、逗号、点号、双引号等分词。在Map函数中:StringTokenizer itr = new

StringTokenizer(value.toString(),” ,.\”:\t\n”);

实现输入文件的大写字母全部换成小写。在Map函数中:word.set(itr.nextToken().toLowerCase());

实现文件输出要求按照value值降序排序。新建一个MapReduce任务,Map负责key-value对换,Reduce默认是按照升序排列的,需要修改默认的排序规则

完整代码如下:

package org.apache.hadoop.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()," ,.\":\t\n");
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
}

/*InverseMapper类的内容参考
public class InverseMapper<K, V> extends Mapper<K,V,V,K> {

// The inverse function.  Input keys and values are swapped.
@Override
public void map(K key, V value, Context context
) throws IOException, InterruptedException {
context.write(value, key);
}
}
*/

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);

}
}

private static class IntWritableDecreasingComparator extends IntWritable.Comparator {

public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Path tempDir = new Path("wordcount-temp-output");
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,tempDir);

job.waitForCompletion(true);

Job sortjob = new Job(conf, "sort");
FileInputFormat.addInputPath(sortjob, tempDir);
sortjob.setInputFormatClass(SequenceFileInputFormat.class);
sortjob.setMapperClass(InverseMapper.class);
sortjob.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(sortjob,
new Path(otherArgs[otherArgs.length - 1]));
sortjob.setOutputKeyClass(IntWritable.class);
sortjob.setOutputValueClass(Text.class);
sortjob.setSortComparatorClass(IntWritableDecreasingComparator.class);

sortjob.waitForCompletion(true);

FileSystem.get(conf).delete(tempDir);
System.exit(0);
}

}


扩展学习

《Hadoop权威指南》一书的相关资料以及实验数据

实验楼前三章: https://www.shiyanlou.com/courses/222

本书代码: http://git.shiyanlou.com/shiyanlou/hadoop-book/src/master

天气完整数据: ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-lite/

Hadoop book网址: http://hadoopbook.com/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: