您的位置:首页 > 大数据 > Hadoop

一看就理解的hadoop中MapReduce的核心API原理及编程实例

2020-07-13 05:52 295 查看

一 、MapReduce

使用Mapreduce框架可以很方便的编写分布式应用程序。主要介绍多个MapReduce程序实例
(1)MapReduce的原理
Mapreduce是一个分布式计算框架,将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务(即可以把任务分组,在多个集群上同时执行这个任务的不同部分),每个节点存储自己上的数据。
(2)每个任务(job)
包括Map和Reduce两部分
执行过程包括:
Map
Combiner
Partitioner
Shuffle and sort
Reduce
一个job的执行过程为:
客户端发出请求给namenode namenode中的job tracker 接收客户端的job请求 然后提交给Task tracker
task tracker (map task,一个map task对应一个文本切片)接收job给的请求 开始执行 map到reduce 一系列操作 完成后 回应job tracker


**其中map的主要任务是
(1)map通过TextInputFormat接口(还有其他接口)读取文件 使用read()方法一次读一行 生成一个 数值对
其中 key是行的起始偏移量 也就是每行字符量的累加
value是指每行的内容
(2)上一步中获得的 键值对 再进入 Mapper类的map()方法生成新的键值对,此时map方法是需要我们根据业务逻辑自己编写的,一个map方法对应上一步中读到的一行数据。
如统计一行数据如 a b c b ww we 的中单词个数 此时map要做的就是实现我们的业务逻辑 即 分割行获得每个单词 并把其作为key的新值 value的值为其单词的个数 1
结果为:(a,1)(b,1) (c,1) (b,1) (ww ,1)(we ,1)
根据业务逻辑不同,其key和value代表的含义也不同 输出的数据类型也不同
(3)在上一步获得的新的对 通过context.write(key value) 输出到一个OutPutCollector收集器中
(4)把收集器中的数据放到环形缓冲区中,其默认大小为100M ,只写80% 满了后就会溢写 把分区好 排序好的键值对写到小文件中

分区是根据每个键值对的Hash值与设置的reduce的数量做取余运算,得到 partition值相等的分到一个区

排序是根据Hash值和key值排序 hash值相同时 使用key排序
整体来说就是 在环形缓冲区中 对数据进行分区的同时 对每个区里的数据进行排序
这样溢写时 才能有序溢写
结果变成:(a,1)(b,1) (b,1) (c 1)(we ,1) (ww ,1)
当缓冲区达到限定值后 数据溢写到本地磁盘文件中 生成了一个一个小文件,
(可以直接使用下文件 进行下一步操作,但由于小文件数量过多,会浪费大量的资源)
故增加如下步骤:
(5)把小文件通过merge合并成一个大的文件,合并采用归并排序 合成一个大文件 故此合并成的大文件也是分区且区内有序的 ,节省资源。
(6)进入到reduce task中
reduce task根据自己的分区号(可以自己设置reduce分区数量),然后去各个节点上拷贝和自己分区号相同的在(4)中获得partiton值 的数据,拷到自己的本地磁盘目录下
(7)reduce 会把来自不同map task中的但是partiton值相同的结果数据 再整合为一个大文件 也是通过归并排序 ,文件中的键值对再根据key重新排序
(8)以上是shuffle的过程,接下来进入reduce task的逻辑运算过程。
(9)先调用groupingComparetor 对上一步中获得的大数据文件的内容进行分组,从文件中每次读取一组键值对。
(10)接着调用自己的自定义的reduce方法 进行运算
(11)最后通过OutPutFormat方法 把获得的结果写到part-r-00000中(设置一个reduce task时,多个 会写到多个里面 )

下面结合实例讲解:
1、统计一个文本各个的单词数量
这是一个wordDriver类 ,里面有主函数 ,连接本地hadoop的各种配置都在主函数中写,以及整个job的提交 。具体如下所示:

public class wordDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf=new Configuration();
//创建一个job tracker 名字叫wordcount
Job job=Job.getInstance(conf,"wordcount" );
//设置jar的位置
job.setJarByClass(wordDriver.class);
//设置map和reduce的位置
job.setMapperClass(wordMapper.class);
job.setReducerClass(wordReduce.class);
//设置map输出的key value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce输出的key value类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce tack的个数
job.setNumReduceTasks(2);
//设置输入输出的路径
FileInputFormat.setInputPaths(job,new Path("/input"));
FileOutputFormat.setOutputPath(job,new Path("/output"));
//提交程序运行
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}

接下来的代码是完成map task 的 对应上面的(2)(3)(4)(5) 这里因为文件较小 没有使用combiner 具体代码如下:
分割字符串 获得每一个单词
//统计单词数量
其中泛型中的<LongWritable, Text,Text, IntWritable>
分别指 keyin的数据类型 valuein的数据类型 keyout的数据类型 valueout的数据类型

public class wordMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//构造输出的key 和value对象
Text k=new Text();//key的数据类型
IntWritable v=new IntWritable(1);//value的数据类型.值都为1
//重写map方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//key是字符偏移量  value是一行文本数据
//把一行文本数据转换成字符串
String line=value.toString();
//分割字符串获得单词 以空格作为分割符
String[] words=line.split("\\s+");
//将每一个单词循环写出
for (String word : words) {
//把每一个word作为键值对的key
k.set(word);
//把每一个键值对
context.write(k,v);
}
}
}

接下来完成 reduce task 接收map传来的数据 (reduce输入的键值对的key数据类型和value数据类型 要和map输出的键值对的key数据类型和value数据类型 一致)这里主要对每个单词出现的次数做累加操作 ,然后输出。具体代码如下:

public class wordReduce extends Reducer<Text, IntWritable, Text,IntWritable >{
//进行单词的统计 reduce接受的数据类型为 (wish,(1,1,1 1,1,1))
//重写reduce方法
int sum=0;
IntWritable v=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//遍历values迭代器
for (IntWritable value : values) {
sum+=value.get();
}
//将key和value写出
v.set(sum);
context.write(key,v);
}
}

mapReduce的主要编程框架:
1、输入(inputFormat)
2、map和reduce结果 自己创建类 分别继承Mapper类和Reduce类 ,根据自己的业务逻辑重写map()和reduce()方法
3、输出(outputformat)提交

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: