您的位置:首页 > 运维架构

基于Hadoop的MR开发

2015-06-22 13:55 246 查看
(一)简介

1.MaprReduce分为Map过程与Reduce过程,我们以词频计算的例子来说明;假设需要统计文本“this is a small cat。that is a small dog”的词频:

(1)Map过程:将每个单词从文本中解析出来并生成key-alue对:<"this",1>,<"is",1>,<"a",1>,<"small",1>,<"cat",1>,<"that",1>,<"is",1>,<"a",1>,<"small",1>,<"dog",1>,其中key表示单词,value表示频数,允许相同的键值对多次出现;

(2)Reduce过程:合并同类项,也就是统计单词出现的最终频数:<"this",1>,<"is",2>,<"a",2>,<"small",2>,<"cat",1>,<"that",1>,<"dog",1>;

2.标准形式的MapReduce程序:一个Map的java文件,一个Reduce的java文件,一个负责调用的主程序Java文件;

3.基于Hadoop的WordCount的流程:

(1)调用hdfs命令行工具,将本地文本文件复制到hdfs上;

(2)用java写MapReduce代码,写完后调戏编译,然后打包成jar包;

(3)调用Hadoop命令,将jar包放在Hadoop集群上处理文本文件进行词频统计,然后将结果存放在指定的目录;

(4)调用hdfs命令行工具,查看处理结果;

(二)wordcount的实现

新建目录wordcount_01存放项目,子目录src存放java源码,子目录classes存放编译结果;源码包括TokenizerMapper.java,IntSumReducer.java,WordCount.java:

1.源码编辑

TokenizerMapper.java源码:

package com.zake.hadoop;

import java.io.IOException;                  //导入异常处理类
import java.util.StringTokenizer;            //导入字符串解析类

import org.apache.hadoop.io.IntWritable;     //导入整数类
import org.apache.hadoop.io.Text;            //导入文本类
import org.apache.hadoop.mapreduce.Mapper;   //导入Mapper类

public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable>
{
IntWritable one = new IntWritable(1);   //定义输出值,始终为1
Text word = new Text();                 //定义输出键

public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());//StringTokenizer类的构造函数只接受java的String类做输入

while(itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word,one);
}
}
}
(1)package语句表明该类由目录com.zake.hadoop管理;

(2)import语句表示使用java api或者hadoop api的包中的类;

(3)Mapper<Object,Text,Text,IntWritable>语句中:

1)第一个参数为Object类,表示输入键key的参数类型,是hadoop根据默认值生成的,一般是文件块里的一行文字的行偏移数,在处理的时候一般用不上;

2)第二个参数为Text类,表示输入值value的参数类型,也就是要处理的字符串比如“This is a cat”;

3)第三个参数为Text类,表示输出键key的参数类型,也就是经map处理后输出键值对比如<"this",1>中的“this”;

4)第四个参数为IntWritable类,表示输出值value的参数类型,也就是经map处理后输出键值对比如<"this",1>中的1;

(4)Context类的定义在Mapper类的内部,因此在源码开头不需要import;

IntSumReducer.java源码:

package com.zake.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)
throws IOException,InterruptedException
{
int sum = 0;
for(IntWritable val:values) //不断地将values中的IntWritable整数提取出来给val
{
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}


(1)Reducer<Text,IntWritable,Text,IntWritable>语句中:

1)第一个参数为Text类,表示输入键key的参数类型;

2)第二个参数为IntWritable类,表示输入值value的参数类型;

3)第三个参数为Text类,表示输出键key的参数类型;

4)第四个参数为IntWritable类,表示输出值value的参数类型;

(2)Iterable<IntWritable> values是一个实现了Iterable接口的变量 ,可以理解伪alues里包含若干个IntWritable整数,比如处理字符串“this is this”,经过map过程之后,到达reduce函数时,依次传递给reduce函数的是:key=“this”,values=<1,1>;key=“is”,values=<1>;

WordCount.java源码:

package com.zake.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
public static void main(String[]args)throws Exception
{
Configuration conf = new Configuration(); //从hadoop配置文件读取参数
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); //从命令行读取参数
if(otherArgs.length!=2)
{
System.err.println("Usage:wordcount<in><out>");
System.exit(2);
}
Job job = new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}


2.编译

javac -classpath /usr/local/lib/hadoop-1.2.1/hadoop-core-1.2.1.jar:/usr/local/lib/hadoop-1.2.1/lib/commons-cli-1.2.jar -d ./classes/ ./src/*.java

编译后将生成./classes/com/zake/hadoop/TokenizerMapper.class,./classes/com/zake/hadoop/IntSumReducer.class,./classes/com/zake/hadoop/WordCount.class

3.打包

jar -cvf wordcount.jar -C ./classes/ .

4.执行

(1)开启hadoop:start-all.sh

(2)构造输入文本:vim input输入“hello world”

(3)将输入文本从本地文件系统放到hdfs上:hadoop fs -put input input



PS:可以看到hdfs上已经有已经上传的input文件了,由于这里已经存在output所以要删除:hadoop fs -rmr output

(4)调用包:hadoop jar wordcount,jar com.zake.hadoop.WordCount input output



(5)查看运行结果:hadoop fs -cat output/*



(6)将运行结果从hdfs复制到本地:hadoop fs -copyToLocal output .

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: