您的位置:首页 > 大数据 > Hadoop

Hadoop学习之路(十三)MapReduce的初识

2018-03-21 11:34 453 查看

MapReduce是什么

首先让我们来重温一下 hadoop 的四大组件:

HDFS:分布式存储系统

MapReduce:分布式计算系统

YARN:hadoop 的资源调度系统

Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架

MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布 式运算程序,并发运行在一个 Hadoop 集群上

为什么需要 MapReduce

1、海量数据在单机上处理因为硬件资源限制,无法胜任

2、而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度

3、引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理

设想一个海量数据场景下的数据计算需求:

单机版:磁盘受限,内存受限,计算能力受限

分布式版:

1、 数据存储的问题,hadoop 提供了 hdfs 解决了数据存储这个问题

2、 运算逻辑至少要分为两个阶段,先并发计算(map),然后汇总(reduce)结果

3、 这两个阶段的计算如何启动?如何协调?

4、 运算程序到底怎么执行?数据找程序还是程序找数据?

5、 如何分配两个阶段的多个运算任务?

6、 如何管理任务的执行过程中间状态,如何容错?

7、 如何监控?

8、 出错如何处理?抛异常?重试?

  可见在程序由单机版扩成分布式版时,会引入大量的复杂工作。为了提高开发效率,可以将 分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

  Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会 涉及的到的内容都封装进了,让用户只用专注自己的业务逻辑代码的开发。它对应以上问题 的整体结构如下:

MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行

MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild

ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild

MapReduce做什么

package com.ghgj.mapreduce.wc.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* 描述: MapReduce出入门:WordCount例子程序
*/
public class WordCountMR {

/**
* 该main方法是该mapreduce程序运行的入口,其中用一个Job类对象来管理程序运行时所需要的很多参数:
* 比如,指定用哪个组件作为数据读取器、数据结果输出器 指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类
* 指定wordcount job程序的jar包所在路径 .... 以及其他各种需要的参数
*/
public static void main(String[] args) throws Exception {
// 指定hdfs相关的参数
Configuration conf = new Configuration();
//         conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");

// 这是高可用的集群的配置文件。如果不是高可用集群,请自行替换配置文件
//        conf.addResource("hdfs_config/core-site.xml");
//        conf.addResource("hdfs_config/hdfs-site.xml");

// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resourcemanager.hostname", "hadoop04");

// 通过Configuration对象获取Job对象,该job对象会组织所有的该MapReduce程序所有的各种组件
Job job = Job.getInstance(conf);

// 设置jar包所在路径
job.setJarByClass(WordCountMR.class);

// 指定mapper类和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// Mapper的输入key-value类型,由MapReduce框架决定
// 指定maptask的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 假如 mapTask的输出key-value类型,跟reduceTask的输出key-value类型一致,那么,以上两句代码可以不用设置

// reduceTask的输入key-value类型 就是 mapTask的输出key-value类型。所以不需要指定
// 指定reducetask的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 为job指定输入数据的组件和输出数据的组件,以下两个参数是默认的,所以不指定也是OK的
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);

// 为该mapreduce程序制定默认的数据分区组件。默认是 HashPartitioner.class
// job.setPartitionerClass(HashPartitioner.class);

// 如果MapReduce程序在Eclipse中,运行,也可以读取Windows系统本地的文件系统中的数据
Path inputPath = new Path("D:\\bigdata\\wordcount\\input");
Path outputPath = new Path("D:\\bigdata\\wordcount\\output33");

// 设置该MapReduce程序的ReduceTask的个数
// job.setNumReduceTasks(3);

// 指定该mapreduce程序数据的输入和输出路径
//        Path inputPath = new Path("/wordcount/input");
//        Path outputPath = new Path("/wordcount/output");
// 该段代码是用来判断输出路径存在不存在,存在就删除,虽然方便操作,但请谨慎
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}

// 设置wordcount程序的输入路径
FileInputFormat.setInputPaths(job, inputPath);
// 设置wordcount程序的输出路径
FileOutputFormat.setOutputPath(job, outputPath);

// job.submit();
// 最后提交任务(verbose布尔值 决定要不要将运行进度信息输出给用户)
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}

/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
* KEYIN 是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long
* VALUEIN 是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String
* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String
* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer
*
* 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架
* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型
*
* Long ----> LongWritable
* String ----> Text
* Integer ----> IntWritable
* Null ----> NullWritable
*/
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

/**
* LongWritable key : 该key就是value该行文本的在文件当中的起始偏移量
* Text value : 就是MapReduce框架默认的数据读取组件TextInputFormat读取文件当中的一行文本
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 切分单词
String[] words = value.toString().split(" ");
for (String word : words) {
// 每个单词计数一次,也就是把单词组织成<hello,1>这样的key-value对往外写出
context.write(new Text(word), new IntWritable(1));
}
}
}

/**
* 首先,和前面一样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出
* reducetask在调我们写的reduce方法,reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分
* (数据的key.hashcode%reducetask数==本reductask号),所以reducetaks的输入类型必须和maptask的输出类型一样
*
* reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的: 先将自己收到的所有的kv对按照k分组(根据k是否相同)
* 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
*/
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

/**
* Text key : mapTask输出的key值
* Iterable<IntWritable> values : key对应的value的集合(该key只是相同的一个key)
*
* reduce方法接收key值相同的一组key-value进行汇总计算
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

// 结果汇总
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
// 汇总的结果往外输出
context.write(key, new IntWritable(sum));
}
}
}
View Code

MapReduce 程序编写规范

1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)

2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)

3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)

4、Mapper 中的业务逻辑写在 map()方法中

5、map()方法(maptask 进程)对每一个<k,v>调用一次

6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式

7、Reducer 的业务逻辑写在 reduce()方法中

8、Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法

9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类

10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

WordCount 的业务逻辑

1、 maptask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总

2、 reducetask 阶段将接受 maptask 的结果,来做汇总计数

MapReduce 运行方式及 Debug

集群运行模式

打 jar 包,提交任务到集群运行,适用:生产环境,不适用:测试,调试,开发

要点一:首先要把代码打成 jar 上传到 linux 服务器

要点二:用 hadoop jar 的命令去提交代码到 yarn 集群运行

要点三:处理的数据和输出结果应该位于 hdfs 文件系统

要点四:如果需要在 windows 中的 eclipse 当中直接提交 job 到集群,则需要修改 YarnRunner 类,这个比较复杂,不建议使用

本地运行模式

Eclipse 开发环境下本地运行,好处是方便调试和测试

直接在IDE环境中进行环境 : eclipse

1、直接运行在本地,读取本地数据

2、直接运行在本地,读取远程的文件系统的数据

3、直接在IDE中提交任务给YARN集群运行

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: