Hadoop 中 MapReduce 新旧改变
2013-12-22 00:00
471 查看
这里所说的 Hadoop 中 MapReduce 新旧改变,是指 Apache Hadoop 2.0 之前的版本。
Hadoop 的 MapReduce Release 0.20.0 的API包括了一个全新的 Mapreduce J***A API,有时候也称为上下文对象。新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用。新的API和旧的API之间有下面几个明显的区别。
新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。
[b]新的 WordCount
[/b]
java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer { void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
[b]旧的 Wordcount
[/b]
package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount{ public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable( 1 ); private Text word = new Text(); public void map(LongWritable key, Text value,OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0 ; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" ); conf.setOutputKeyClass(Text.class ); conf.setOutputValueClass(IntWritable.class ); conf.setMapperClass(Map.class ); conf.setCombinerClass(Reduce.class ); conf.setReducerClass(Reduce.class ); conf.setInputFormat(TextInputFormat.class ); conf.setOutputFormat(TextOutputFormat.class ); FileInputFormat.setInputPaths(conf, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(conf, new Path(args[ 1 ])); JobClient.runJob(conf); } }
相关文章推荐
- 编译drill +hadoop 2.2
- Hadoop 2.2.0单机版安装
- [转载]HDFS的'Block'和MapReduce的'Split'之间的关系和区别
- Hadoop生成HFile直接入库HBase心得
- 【转载】经典漫画讲解HDFS原理
- thrift实现HDFS文件操作
- thrift实现HDFS文件操作
- 使用FileSystem以标准输出格式显示HDFS当中的文件
- 将HDFS文件当中部分字节读入本地
- 从本地读入部分字节到HDFS当中
- HDFS概述
- Hadoop
- HDFS的数据通信机制
- Hadoop-2.2.0 Linux 64位系统本地库编译
- hdfs 的存储空间扩展
- HDFS数据存储位置与复制详解
- Hadoop云计算的初步认识
- Hadoop 2.0集群配置详细教程
- hdfs文件系统FileSystem.rename异常
- Hadoop 2.2.0 Symlink的使用