您的位置:首页 > 运维架构

Hadoop一些比较杂的知识点

2016-03-12 13:20 274 查看

MapReduce(分布式计算模型)

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。

这两个函数的形参是key、value键值对,表示函数的输入信息。

在MapReduce1.0种,是jobTracker和taskTracker,到了2.0后,jobTracker变身为ResourceManager,taskTracker变身为NodeManager,支持了多平台的适用性。

Map:

分割成小的MAP


Reduce:

合并MAP计算结果


Hdfs -> MapperTask(逻辑切分) -> shutffle(根据key进行分组) -> ReducerTask -> hdfs

执行步骤:

1. map任务处理

1) 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。

2) 写自己的逻辑,对输入的key,value处理,转换成新的key,value输出。

3) 对输出的key,value进行分区。

4) 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

5) (可选)分组后的数据进行归并.

2. reduce任务处理

1) 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2) 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key,value进行处理,转换成新的key,value输出.

3) 把reduce的输出保存到文件中.

Hadoop序列化的特点

使用的是writable序列化

紧凑:高效使用存储空间.

快速:读写数据的额外开销小.

互操作:支持多种语言的交互.

可扩展:支持读取老格式的数据.

JPDA

Sun Microsystem的Java Platform Debugger Architecture(JPDA)技术是一个多层架构,使您能够在各种环境中轻松调试 Java 应用程序。JPDA 由两个接口(分别是 JVM Tool Interface 和 JDI)、一个协议(Java Debug Wire Protocol)和两个用于合并它们的软件组件(后端和前端)组成。它的设计目的是让调试人员在任何环境中都可以进行调试。

JDWP设置

jvm本身就支持远程debug,eclipse也支持JDWP,置需要在各模块的JVM启动时加载下面的参数:

dt_socket表示使用套接字传输。

address=8000

JVM在8000端口上监听请求,这个设定为一个不冲突的端口即可。

server=y

y表示启动的JVM是被调试者。如果为n,则表示启动的JVM是调试器。

suspend=y

y表示启动的JVM会暂停等待,直到调试器连接上才继续执行。suspend=n,则JVM不会暂停等待。

hadoop远程debug配置

在$HADOOP_HOME/etc/hadoop/hadoop-env.sh的最后面添加以下参数:

#远程调试namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"
#远程调试datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"

#远程调试RM
export YARN_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"

#远程调试NM
export YARN_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"


Shuffle过程(包括以下):

* Paritioner编程

* 自定义排序编程

* Combiner编程

* 常见的MapReduce算法

partitioner编程(其实就是根据key进行分区,分配几个reducer)

Partitioner是partitioner的基类,如果需要定制partitioner也需要集成该类。

HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashcode() & Integer.MAX_VALUE)%numReduceTasks,得到目前的reducer。

(例子以jar形式运行)

public int getPartition(Text key, DataBean value, int numPartitions) {
// 返回的就是分区数量
String account = key.toString();
String sub_acc = account.substring(0,3);
Integer code =providerMap.get(sub_acc);
if(code == null){
code = 0;
}
return code;
}


Combiners编程

每一个map都可能产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reducer功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:combiner的输出是reducer的输入,如果combiner是可插拔的,添加combiner绝不能改变最终的计算结果。所以combiner只应用于那种Reduce的输入KEY/VALUE与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值。

决定Maaper数量的因素

一个Mapper对应一个文件切片,切片是根据参数进行定义的。

1.long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//其中getFormatMinSplitSize函数在hadoop中直接return 1.
//getMinSplitSize(Job)是获取对应的配置文件数据mapreduce.input.fileinputformat.split.minsize。
//此参数在hadoop-mapreduce-client-core-2.4.1.jar中mared-reduce.xml中默认定义为0
//所以返回的是1
2.long maxSize = getMaxSplitSize(job);
//此处调用return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE)
//SPLIT_MAXSIZE是配置文件mapreduce.input.fileinputformat.split.maxsize,默认没有定义,所以返回Long的最大值
3. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// computeSplitSize的内部是return Math.max(minSize, Math.min(maxSize, blockSize));
//先得到blocksize和maxsize的最小值,然后返回minSize和结果的最大值。


结论:

默认情况下,返回的splitSize是block大小,mapper和splitSize的对应关系是一对一,然后和block也是1对1.

但如果定义maxsize,那么返回的则有不同的情况,当splitSize大于block大小时,一个mapper对应多个block;当splitSize小于block时,一个block需要建立多个mapper.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: