您的位置:首页 > 大数据 > Hadoop

Hadoop 中 MapReduce 新旧改变

2013-12-22 00:00 471 查看

这里所说的 Hadoop 中 MapReduce 新旧改变,是指 Apache Hadoop 2.0 之前的版本。


Hadoop 的 MapReduce Release 0.20.0 的API包括了一个全新的 Mapreduce J***A API,有时候也称为上下文对象。新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用。新的API和旧的API之间有下面几个明显的区别。

新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。


新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。


新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。


新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。


新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。



[b]新的 WordCount


[/b]

java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
	
	public static class TokenizerMapper extends Mapper{
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		StringTokenizer itr = new StringTokenizer(value.toString());
		while (itr.hasMoreTokens()) {
			word.set(itr.nextToken());
			context.write(word, one);
		}
	  }
	}
	public static class IntSumReducer extends Reducer {
	void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
		sum += val.get();
		}
			result.set(sum);
			context.write(key, result);
	   }
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


[b]旧的 Wordcount


[/b]

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount{
	public static class Map extends MapReduceBase implements 
	  Mapper {
		private final static IntWritable one = new IntWritable( 1 );
		private Text word = new Text();
		public void map(LongWritable key, Text value,OutputCollector output,
				Reporter reporter)	throws IOException	{
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens())
			{
				word.set(tokenizer.nextToken());
				output.collect(word, one);
			}
		}
	}

	public static class Reduce extends MapReduceBase implements
		Reducer  {
		
		public void reduce(Text key, Iterator values,
				OutputCollector output, Reporter reporter) throws IOException
		{
			int sum = 0 ;
			while (values.hasNext())
			{
			sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}
	
	public static void main(String[] args) throws Exception	{
		JobConf conf = new JobConf(WordCount. class );
		conf.setJobName("wordcount" );
		conf.setOutputKeyClass(Text.class );
		conf.setOutputValueClass(IntWritable.class );
		conf.setMapperClass(Map.class );
		conf.setCombinerClass(Reduce.class );
		conf.setReducerClass(Reduce.class );
		conf.setInputFormat(TextInputFormat.class );
		conf.setOutputFormat(TextOutputFormat.class );
		FileInputFormat.setInputPaths(conf, new Path(args[ 0 ]));
		FileOutputFormat.setOutputPath(conf, new Path(args[ 1 ]));
		JobClient.runJob(conf);
	}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: