Hadoop之——以1.x版本和0.x版本分别实现单词统计功能
2015-05-25 23:45
375 查看
转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45998833
本文提供一个以Hadoop MapReduce方式统计文本中每个单词的数量的例子,包含1.x版本和0.x版本的实现,同时简要说明了两个版本的不同,不多说,直接上代码
一、Hadoop 1.x版本的实现
运行结果
运行结果
Hadoop版本0.x的包一般是mapred
Hadoop版本0.x使用JobConf
Hadoop版本0.x使用JobClient.runJob(JobConf对象)提交作业
Hadoop版本1.x api extends MapReduceBase implements Mapper
Hadoop版本1.x api extends MapReduceBase implements Reducer
本文提供一个以Hadoop MapReduce方式统计文本中每个单词的数量的例子,包含1.x版本和0.x版本的实现,同时简要说明了两个版本的不同,不多说,直接上代码
一、Hadoop 1.x版本的实现
package com.lyz.hadoop.count; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * 利用Hadoop MapReduce统计文本中每个单词的数量 * @author liuyazhuang */ public class WordCount { //要统计的文件位置 static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello"; //统计结果输出的位置 static final String OUT_PATH = "hdfs://liuyazhuang:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); //如果已经存在输出文件,则先删除已存在的输出文件 if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf , WordCount.class.getSimpleName()); //1.1指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); //map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setPartitionerClass(HashPartitioner.class); //有一个reduce任务运行 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); //指定输出文件的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //把job提交给JobTracker运行 job.waitForCompletion(true); } /** * KEYIN 即k1 表示行的偏移量 * VALUEIN 即v1 表示行文本内容 * KEYOUT 即k2 表示行中出现的单词 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 */ static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = v1.toString().split("\t"); for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } }; } /** * KEYIN 即k2 表示行中出现的单词 * VALUEIN 即v2 表示行中出现的单词的次数 * KEYOUT 即k3 表示文本中出现的不同单词 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 * */ static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { long times = 0L; for (LongWritable count : v2s) { times += count.get(); } ctx.write(k2, new LongWritable(times)); }; } }控制台打印信息
运行结果
二、Hadoop 0.x版本的实现
package com.lyz.hadoop.old; import java.io.IOException; import java.net.URI; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.HashPartitioner; import com.lyz.hadoop.count.WordCount; /** * Hadoop中的一些老的API用法 * Hadoop版本1.x的包一般是mapreduce * Hadoop版本0.x的包一般是mapred * @author liuyazhuang * */ public class OldApp { //要统计的文件位置 static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello"; //统计结果输出的位置 static final String OUT_PATH = "hdfs://liuyazhuang:9000/out"; /** * Hadoop老版本与新版本相比,不同点是: * 1:不再使用Job,而是使用JobConf * 2、包名是mapred而不是mapreduce * 3、不使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(JobConf对象) */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); //如果已经存在输出文件,则先删除已存在的输出文件 if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final JobConf job = new JobConf(conf , WordCount.class); //1.1指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormat(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); //map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setPartitionerClass(HashPartitioner.class); //有一个reduce任务运行 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); //指定输出文件的格式化类 job.setOutputFormat(TextOutputFormat.class); //把job提交给JobTracker运行 JobClient.runJob(job); } /** * 新api extends Mapper * 老api extends MapReduceBase implements Mapper * @author liuyazhuang * */ static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{ @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { String[] splited = value.toString().split("\t"); for (String word : splited) { output.collect(new Text(word), new LongWritable(1)); } } } /** * 新api extends Reducer * 老api extends MapReduceBase implements Reducer * @author liuyazhuang * */ static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{ @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { long times = 0; while (values.hasNext()) { times += values.next().get(); } output.collect(key, new LongWritable(times)); } } }控制台打印信息
运行结果
总结:
1、包名不同
Hadoop版本1.x的包一般是mapreduceHadoop版本0.x的包一般是mapred
2、作业处理类不同
Hadoop版本1.x用Job,Hadoop版本0.x使用JobConf
3、 作业提交方式不同
Hadoop版本1.x使用job.waitForCompletion(true)提交作业Hadoop版本0.x使用JobClient.runJob(JobConf对象)提交作业
4、Mapper实现不同
Hadoop版本1.x api extends MapperHadoop版本1.x api extends MapReduceBase implements Mapper
5、Reducer实现不同
Hadoop版本1.x api extends ReducerHadoop版本1.x api extends MapReduceBase implements Reducer
相关文章推荐
- Hadoop:使用原生python编写MapReduce来统计文本文件中所有单词出现的频率功能
- (12) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版
- 6.对学生成绩进行统计计算,参加考试的有6名学生,考试成绩分别为94.5,89.0,79.5,64.5,81.5,73.5,显示考试的总分和平均分,之后显示大于考试平均分的成绩信息。请写出实现上述功能
- (13) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版 2
- (11) Hadoop Java 实现MapReduce HelloWord 单词统计
- 用STL实现单词统计功能
- 使用Hadoop实现单词统计
- 使用目前hadoop内置的序列化类(不使用自定义序列化类),实现流量统计的功能
- 使用DFSClient实现 hadoop上传文件功能,采用使用输入输出流实现 Hadoop 版本2.7.0
- python简单实现hadoop map reduce统计功能
- 使用DFSClient实现 hadoop上传文件功能,采用使用输入输出流实现 Hadoop 版本2.7.0
- 每天一个小程序之python实现统计英文单词出现个数
- C语言实现文件单词统计
- 统计输入中所有单词出现的次数(使用二叉查找树实现:递归和非递归)
- 统计文章单词[JAVA实现]-经典笔试题
- 单词计数程序在hadoop上的实现
- java实现输入一行字符,分别统计出其中英文字母、空格、数字和其它字符的个数。
- Java实现的词频统计——功能改进
- 使用jquery 实现 仿google 的智能提示输入框功能 (改进版本)