您的位置:首页 > 大数据

【大数据笔记03】MapReduce

2019-03-21 16:38 211 查看

框架结构

JavaAPI规范(很少用,一般都是用HiveSQL)

(1) 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 mr 程序的客户端)
(2)Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
(3)Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
(4)Mapper 中的业务逻辑写在 map()方法中
(5)map()方法(maptask 进程)对每一个<K,V>调用一次
(6)Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
(7)Reducer 的业务逻辑写在 reduce()方法中
(8)Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法
(9)用户自定义的 Mapper 和 Reducer 都要继承各自的父类
(10)整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

运行模式(了解)

本地运行模式
(1)mapreduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行
(2)而处理的数据及输出结果可以在本地文件系统,也可以在 hdfs 上
(3)怎样实现本地运行?写一个程序,不要带集群的配置文件本质是程序的 conf 中是否有 mapreduce.framework.name=local 以及yarn.resourcemanager.hostname 参数
(4)本地模式非常便于进行业务逻辑的 debug,只要在 eclipse 中打断点即可
集群运行模式
(1)将 mapreduce 程序提交给 yarn 集群,分发到很多的节点上并发执行
(2)处理的数据和输出结果应该位于 hdfs 文件系统
(3)提交集群的实现步骤:将程序打成 JAR 包,然后在集群的任意一个节点上用 hadoop 命令启动hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args

MapReduce的序列化

1 . 概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop 自己开发了一套序列化机制( Writable),精简,高效。不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。

Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。
一个类要支持可序列化只需实现这个接口即可。

public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}

2 . Writable 序列化接口
如需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序,此时,自定义的bean 实现的接口应该是:

public class FlowBean implements WritableComparable<FlowBean>

需要自己实现的方法是:

/**
* 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
/**
* 序列化的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
out.writeLong(sumflow);
}
@Override
public int compareTo(FlowBean o) {
//实现按照 sumflow 的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}

compareTo 方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回 0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如:

o1.compareTo(o2);

返回正数的话,当前对象(调用 compareTo 方法的对象 o1)要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面。

MapReduce的Combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络 IO 性能,是 MapReduce 的一种优化手段之一。

  • combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
  • combiner 组件的父类就是 Reducer
  • combiner 和 reducer 的区别在于运行的位置:
    Combiner 是在每一个 maptask 所在的节点运行
    Reducer 是接收全局所有 Mapper 的输出结果;
  • combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
  • 具体实现步骤:
    1、自定义一个 combiner 继承 Reducer,重写 reduce 方法
    2、在 job 中设置: job.setCombinerClass(CustomCombiner.class)
  • combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: