您的位置:首页 > 运维架构 > 网站架构

5-Hadoop架构MapReduce模型

2016-06-18 13:46 597 查看
Mapper/Reducer

map:并行计算

map<K,V>

entry:条目 (key-value)

key:行号,自动产生,以0位基址。

Job 作业:每一次的mapreduce过程就是一个作业

   job 作业==map task + reduce task==application

作业: job

任务: task

      map task
reduce task

map过程:

原始<k-v>抽取出新的<k-v>

map<k-v>-->map-->map<k-v>

map前

(0,ssss1990---12--sdsd-fsdf)

(5,ssss1990---42--sdsd-fsdf)

(10,ssss1991---90--sdsd-fsdf)

(15,ssss1991---62--sdsd-fsdf)

map后,抽取有用的数据组成新的map

(1990,12)

(1990,42)

(1991,90)

(1991,62)

shuffle:洗牌,对上述的重复key分组,并排序

(1990,[12,42])

(1991,[90,62])

reduce:化简,取出有用的最大值数据,忽略其他数据

(1990,42)

(1990,90)

最后输出

1990,42

1991,90

hadoopdemo的编写

1.引入有用的jar包

  解压2.7.1jar包,windows搜索.jar,copy至_lib目录,再以_lib为基目录,搜索source.jar,删除即可,留下的就是有用的hadoop的jar

2.配置文件

3.API

程序的运行

  

 独立模式:

 将程序达成jar包,不需打hadoop的jar,需要指定入口类,上传至Linux/windows

 讲原始数据也上传至linux。

 export HADOOP_CL:ASSPATH=~/demo/hadoopdemo1.jar 

 hadoop  hadoopdemo1.MaxTemp file:///~/demo/*.gz ~/demo/output

 

 集群模式:

先启动hadoop集群,测试一下jps。

创建目录 testdata,hadoop fs -mkdir -p /wpy/testdata

上传测试的原始数据到hdfs系统

         hadoop fs -put 19*.gz /wpy/testdata

 异常处理;

    不能启动数据节点或者无法上传至hdfs系统
version不一致导致的,可以查看master和slaves节点的VERSION文件

    /tmp/hadoop-wpy/dfs/name/current
中的namespaceID,namespaceID,发现不一致。

   这个问题一般是由于两次或两次以上的格式化NameNode造成的,有两种方法可以解决,第一种方法是删除DataNode的所有资料;
第二种方法是修改每个DataNode的namespaceID(位于/dfs/data/current/VERSION文件中)
或修改NameNode的namespaceID(位于/dfs/name/current/VERSION文件中),使其一致。

    干掉master和slave的所有/tmp/hadoop-wpy/dfs/name/current/VERSION文件,简便起见直接
 rm -rf  /tmp/hadoop-wpy/
再重新格式化
 hadoop namenode -format

   会发现Live Nodes的节点数为2了,OK。

运行jar程序:

hadoop jar hadoopdemo1.jar /wpy/testdata/19*.gz /wpy/output

运行成功,日志如下:

16/05/27 06:03:59 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=302337
FILE: Number of bytes written=1329243
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=370049
HDFS: Number of bytes written=18
HDFS: Number of read operations=25
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=13130
Map output records=13129
Map output bytes=118161
Map output materialized bytes=144431
Input split bytes=190
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=144431
Reduce input records=13129
Reduce output records=2
Spilled Records=26258
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=184
Total committed heap usage (bytes)=457912320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=147972
File Output Format Counters 
Bytes Written=18

wpy的第一个hadoop程序计算最高最低气温结果:true

成功会多出一个标记文件:

/wpy/output/_SUCCESS

还有一个输出结果的文件:

/wpy/output/part-r-00000

hadoop fs -cat /wpy/output/part-r-00000

1901 317

1902 244

集群调整与shell脚本:

前期技术准备:

shell脚本:

新建一个文件,并修改文件执行权限

chmod a+x line_bash.sh

编辑内容,注意文件的第一行的声明:

#!/bin/bash

echo "hello world!"

执行文件 ./line_bash.sh

复制一个文件b.txt到s1,s2,s3

#!/bin/bash

for((i=1;i<4;i++)){

scp ~/wpy/b.txt root@s$i:/home/wpy/wpy/

}

修改副本数和块block的大小:

需要批量修改的配置文件

core-site.xml

hdfs-site.xml

mapred-site.xml

yarn-site.xml

配置参考:/share/XXXX/XXXX.jar

core-site.xml可以配置的元素参考

    hadoop-common-2.7.1.jar/core-default.xml文件

hdfs-site.xml可以配置的元素参考

    hadoop-hdfs-2.7.1.jar/hdfs-default.xml

mapred-site.xml可以配置的元素参考

    hadoop-mapreduce-client-core-2.7.1.jar/mapred-default.xml

yarn-site.xml可以配置的元素参考

    hadoop-yarn-common-2.7.1.jar/yarn-default.xml

配置文件的加载:

先加载系统默认的default.xml文件,再加载对应的用户自定义文件

如:先 core-default.xml 后  core-site.xml

在程序中可以修改这些配置信息:

eg;

     //获取配置信息,设置最大切割块大小为20k

     Configuration conf= job.getconfiguration();
conf.setLong(FileInputFormat.Split_Maxsize,1024*20);

修改副本数:

 搜索“dfs.replication”

修改块大小:

 搜素"dfs.blocksize"

这里块大小可以配置为更人性化的20k这种形式

注意块大小不能比hadoop默认的块最小值还小,如果要配置更小的值,那么需要修改块最小值的配置

dfs.namenode.fs-limits.min-block-size  

1048576==1M

修改这里的最小值不能设置为10k这种形式,应该用数字,可以看说明

在s0节点修改后,发送覆盖所有节点,重启节点前,删除所有临时namenode节点目录/tmp/hadoop-wpy/

可以使用shell脚本来发送覆盖配置文件

scp $HADOOP_INSTALL/etc/hadoop_mode3/hdfs-site.xml  wpy@s$i:$HADOOP_INSTALL/etc/hadoop_mode3/

ssh s$i rm -rf /tmp/hadoop-wpy/

格式化文件系统,(只格式化s0???)  需要重新在建/wpy/testdata/

修改hdfs-site.xml文件不需删除tmp文件

 

可以在数据节点的/wpy/hadoop-wpy/ data/下找到块文件

也可以通过网页来查看

数据切割与运行:

修改代码,运行查看数据的分布

可以从上下文获取获取ip,hostname等来查看,参考42讲

map方法执行:

run(){

//任务开始前执行一次

setup();

//循环执行

while(条件){

map(,,,,);

}

//任务执行结束后执行一次

cleanup()

}

一个切分的块split blocksize对应一个map任务,可能对应一个node

gz文件是不被切分的,会视作一个文件整体提交给map. blocksize依赖切割split算法,可以将gz文件解压成普通的文本文件。

可以通过InputSplit来打印切割的长度,

在setup和clientup中打印的日志,应该到hadoop_install/.../logs/userlogs/..stdout查看

同一块数据有多个备份,只执行其中的一个备份数据节点。

  例如:块1:有2个备份的话,分别在s1,s4上

以备份数为2,共6个数据节点为例:

b0: s4 s6

b1: s2 s3

b2: s1 s4

b3: s2 s4

运行结果:

s1: no log

s2: no log

s3: no log

s4: log

s5: log

s6: log

上述结果说明:

数据节点的备份和程序运行的节点没有直接关系。

程序运行的节点是hadoop会找比较空闲的节点来运行,读取的数据来源于备份的数据节点。

s4,s5,s6上有程序运行,他们读取的数据节点来源于上述的数据备份点,多个备份只读取其中的一个备份节点数据。

hdfs:

  namenode: ip+ : 50070
datanode: ip+ : 50075
datanode: ip+ : 50090

mapreduce(yarn):

 resourcemanager(资源统一调度,和s0同一节点): ip + : 8088

     nodemanager(数据节点管理,和数据节点在一起): ip + :8042,里面有打印的用户日志,可以通过logs查看
 

job: input dir | input file

gz;不能切割,要切割换成纯文本文件

blocksize: 20k min:10240

split切割准则
  1.不能超过max blocksize,

             否则实际运行=maxblocksize

     2.可以小于minblocksize
        实际运行=自定义
d3bf
blocksize

     3.介于max 和min之间也是ok的

win7下修改hosts文件,便于浏览器直接访问节点数据,对节点就能直接转换

C:\Windows\System32\drivers\etc\hosts文件,增加

192.168.198.130  s0

192.168.198.132  s1

192.168.198.133  s2

192.168.198.139  s3

s0:8080 <==> 192.168.198.130:8080

mapreduce工作原理:

下载eclipse的Linux 64位版本tar.gz

下载HDT

进入到符号链接/a/b目录下

aaaa-->/a/b/

cd -P .. 表示退回到物理的上层目录,即/a。

./意义:.表示当前目录,/表示某个目录下,所以./表示当前目录下。

通常用来执行以下脚本。 ./bash 执行当前目录下的bash脚本,直接写bash是不行的,找不到这个脚本。

解压eclipse,建一个工作区/home/wpy/eclipse(最好在自己根目录下),注意将该客户机的内存调大至2G. 

  在eclipse目录下  启动程序./eclipse
为了能在运行eclipse时 脚本窗口也能用,放到后台运行,可以 ./eclipse &
为了能在任何目录下直接运行eclipse,可以这样  ln -s /usr/wpysoft/eclipse eclipse

快捷键的设定:

 比如System.out.println();找keys,解除绑定,重新设置就行,再排除一个有没有已经有设为alt+/的快捷键,如果有解除改为其他的。保存就行了。

 如果启动时忘记

 前台程序隐藏到后天运行: ctrl +z

 恢复到前台运行: bg +%1  1是作业号(jps查询)

下载eclipse的HTD插件:

hadoop2x-eclipse-plugin.zip插件

找到里面的relase目录下的问jar包解压出来,放到eclipse下的插件目录plugin目录

  ungzip   hadoop2x-eclipse-plugin.zip

重启eclpise,在preferrence->下发现 hadoop map/reduce条目就说明插件安装成功,然后制定hadoop的安装目录即可

打开hadoop的视图,配置端口号8020等,可以通过左侧的菜单的文件目录来查看hadoop的跟目录的文件

mapreduce的工作原理

1.客户端client: 提交作业Job(mr). 可以是集群中也可以是集群外

2.资源管理器ResourceManager(RM): 寻找执行该作业的负责人

3.应用程序主管ApplicationManager(AM): 执行作业Job任务。

流程图: 见截图

ant: apache的打包工具

过程: .java--编译-->.class--打包-->.jar

示例:

<project name="hadoopdeomo1" basedir="." default="package">
<path id="path1">
<fileset dir="E:\linux_down\hadoop-2.7.1\hadoop-2.7.1\_lib">
 <include name="*.jar"/>
</fileset>
</path>
<target name="prepare">
 <delete dir="${basedir}/build/classes"></delete>
 <mkdir dir="${basedir}/build/classes"/>
</target>

<target name="compile" depends="prepare">
 <javac srcdir="${basedir}/src" destdir="${basedir}/build/classes" classpathref="path1" 
  includeantruntime="true" encoding="UTF-8" debug="true" debuglevel="lines,source">
 </javac>
</target>

<target name="package" depends="compile">
<jar destfile="${basedir}/my.jar" basedir="${basedir}/build/classes"></jar>
</target>

</project>

hadooop:

hdfs:

  namenode:
datanode:
secondarynamenode:

mapreduce:

  maper:
reducer:

yarn框架: mapreduce的执行框架,在集群上调度执行的。

  ResourceManager:
NodeManager:

 

hadoop核心类/进程,这些都是单独的进程,都有main()方法入口。
hdfs: 数据的存储
NameNode           :  org.apache.hadoop.hdfs.server.namenode.NameNode
DataNode           :  org.apache.hadoop.hdfs.server.datanode.DataNode
SecondaryNameNode  :  org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
yarn: mapreduce作业的调度
ResourceManager    :  org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
NodeManager        :  org.apache.hadoop.yarn.server.nodemanager.NodeManager

hadoop的调试:

yarn调度框架:

 调度MR作业的,是基于事件的;

 作业Job提交给yarn的资源管理器,作业Job在这里称之为应用Application.

 打开8088端口就可以看到,application是有状态的:运行,完成,失败,被杀死等状态。

 application是基于状态机的,而状态之间的变换是通过事件触发的。

 # Applications
* NEW
* NEW_SAVING
* SUBMITTED
* ACCEPTED
* RUNNING
* FINISHED
* FAILED
* KILLED 

异步事件分发器(总管道):

  AsyncDispatcher,两次分发,可查看流程图"yarn事件调度机制分析.png"

  




ResourceManager中的分发器:

真正的事件处理实体          处理的事件                   事件分发器                     事件的状态类型

RMAppAttemptImpl  --> RMAppAttemptEvent     --> ApplicationAttemptEventDispatcher -->  enum RMAppAttemptEventType

RMAppImpl         --> RMAppEvent            --> ApplicationEventDispatcher        -->  enum RMAppEventType

RMNodeImpl        --> RMNodeEvent           --> NodeEventDispatcher               -->  enum RMNodeEventType 

RMContainerImpl   --> ContainerPreemptEvent --> RMContainerPreemptEventDispatcher -->  enum ContainerPreemptEventType

exit()            --> RMFatalEvent          --> RMFatalEventDispatcher            -->  enum RMFatalEventType

ResourceScheduler --> SchedulerEvent        --> SchedulerEventDispatcher          -->  enum SchedulerEventType

yarn的ResourceManager远程调试:

----------------------------

1.设置ResourceManager的JAM,开启远程调试:

    设置YARN_RESOURCEMANAGER_OPTS变量开启调试,可以编辑一个shell脚本
开启脚本,enable_yarn_remotedebug.sh:

         export  YARN_RESOURCEMANAGER_OPTS ="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
禁用脚本,disable_yarn_remotedebug.sh:
    export  YARN_RESOURCEMANAGER_OPTS =

2.执行脚本

   ./enable_yarn_remotedebug.sh,并让立刻生效 source enable_yarn_remotedebug.sh

3.启动yarn脚本:
然后start-yarn.sh
可以看到lisening for ....8000,开始监听8000端口

4.在window7下配置eclispe,连接到资源管理器的8000端口

   可以在源代码上添加断点
在ResourceManager的main(),debug confirguration,新弹出的窗口中找到remote java application,配置参数
 连接类型;选standard
host:s0
port:8000
然后点击调试

Yarn底层事件分发调试RMApp事件处理调试+状态机变换

可以在资源管理器中找到ApplicationEventDispatcher分发器的真正的事件处理实体RMAppImpl的handel处理方法,handel方法是一个不断被

调用来执行对应的事件状态RMAppState的状态值变化的方法。打断点就就可以单步跟踪看到期状态的变换。

注意必须在每次执行this.writeLock.unlock();后才能在8088端口上查看到正在运行的作业applicaiton的状态值的变化,变化值对应

RMAppState中的从NEW-->FINISHED.

Hadoop 底层IPC和RPC通信原理解析

底层都不是传统的同步socket/serversocket套接字,而是在此基础上发展的异步通信NIO机制,提高效率。

IPC:进程间通信

RPC(remote prodedure call):远程过程调用

clinet--message:protocol(byete[])-->server

org.apache.hadoop.ipc.Client

org.apache.hadoop.ipc.Server

Client与Server之间通过进行一系列的封装成字节数组协议进行通信。

PB(protoBuf):协议缓存区,protobuf-java-2.5.0.jar包中就是用的谷歌的一些东西。

p
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop