5-Hadoop架构MapReduce模型
2016-06-18 13:46
597 查看
Mapper/Reducer
map:并行计算
map<K,V>
entry:条目 (key-value)
key:行号,自动产生,以0位基址。
Job 作业:每一次的mapreduce过程就是一个作业
job 作业==map task + reduce task==application
作业: job
任务: task
map task
reduce task
map过程:
原始<k-v>抽取出新的<k-v>
map<k-v>-->map-->map<k-v>
map前
(0,ssss1990---12--sdsd-fsdf)
(5,ssss1990---42--sdsd-fsdf)
(10,ssss1991---90--sdsd-fsdf)
(15,ssss1991---62--sdsd-fsdf)
map后,抽取有用的数据组成新的map
(1990,12)
(1990,42)
(1991,90)
(1991,62)
shuffle:洗牌,对上述的重复key分组,并排序
(1990,[12,42])
(1991,[90,62])
reduce:化简,取出有用的最大值数据,忽略其他数据
(1990,42)
(1990,90)
最后输出
1990,42
1991,90
hadoopdemo的编写
1.引入有用的jar包
解压2.7.1jar包,windows搜索.jar,copy至_lib目录,再以_lib为基目录,搜索source.jar,删除即可,留下的就是有用的hadoop的jar
2.配置文件
3.API
程序的运行
独立模式:
将程序达成jar包,不需打hadoop的jar,需要指定入口类,上传至Linux/windows
讲原始数据也上传至linux。
export HADOOP_CL:ASSPATH=~/demo/hadoopdemo1.jar
hadoop hadoopdemo1.MaxTemp file:///~/demo/*.gz ~/demo/output
集群模式:
先启动hadoop集群,测试一下jps。
创建目录 testdata,hadoop fs -mkdir -p /wpy/testdata
上传测试的原始数据到hdfs系统
hadoop fs -put 19*.gz /wpy/testdata
异常处理;
不能启动数据节点或者无法上传至hdfs系统
version不一致导致的,可以查看master和slaves节点的VERSION文件
/tmp/hadoop-wpy/dfs/name/current
中的namespaceID,namespaceID,发现不一致。
这个问题一般是由于两次或两次以上的格式化NameNode造成的,有两种方法可以解决,第一种方法是删除DataNode的所有资料;
第二种方法是修改每个DataNode的namespaceID(位于/dfs/data/current/VERSION文件中)
或修改NameNode的namespaceID(位于/dfs/name/current/VERSION文件中),使其一致。
干掉master和slave的所有/tmp/hadoop-wpy/dfs/name/current/VERSION文件,简便起见直接
rm -rf /tmp/hadoop-wpy/
再重新格式化
hadoop namenode -format
会发现Live Nodes的节点数为2了,OK。
运行jar程序:
hadoop jar hadoopdemo1.jar /wpy/testdata/19*.gz /wpy/output
运行成功,日志如下:
16/05/27 06:03:59 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=302337
FILE: Number of bytes written=1329243
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=370049
HDFS: Number of bytes written=18
HDFS: Number of read operations=25
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=13130
Map output records=13129
Map output bytes=118161
Map output materialized bytes=144431
Input split bytes=190
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=144431
Reduce input records=13129
Reduce output records=2
Spilled Records=26258
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=184
Total committed heap usage (bytes)=457912320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=147972
File Output Format Counters
Bytes Written=18
wpy的第一个hadoop程序计算最高最低气温结果:true
成功会多出一个标记文件:
/wpy/output/_SUCCESS
还有一个输出结果的文件:
/wpy/output/part-r-00000
hadoop fs -cat /wpy/output/part-r-00000
1901 317
1902 244
集群调整与shell脚本:
前期技术准备:
shell脚本:
新建一个文件,并修改文件执行权限
chmod a+x line_bash.sh
编辑内容,注意文件的第一行的声明:
#!/bin/bash
echo "hello world!"
执行文件 ./line_bash.sh
复制一个文件b.txt到s1,s2,s3
#!/bin/bash
for((i=1;i<4;i++)){
scp ~/wpy/b.txt root@s$i:/home/wpy/wpy/
}
修改副本数和块block的大小:
需要批量修改的配置文件
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
配置参考:/share/XXXX/XXXX.jar
core-site.xml可以配置的元素参考
hadoop-common-2.7.1.jar/core-default.xml文件
hdfs-site.xml可以配置的元素参考
hadoop-hdfs-2.7.1.jar/hdfs-default.xml
mapred-site.xml可以配置的元素参考
hadoop-mapreduce-client-core-2.7.1.jar/mapred-default.xml
yarn-site.xml可以配置的元素参考
hadoop-yarn-common-2.7.1.jar/yarn-default.xml
配置文件的加载:
先加载系统默认的default.xml文件,再加载对应的用户自定义文件
如:先 core-default.xml 后 core-site.xml
在程序中可以修改这些配置信息:
eg;
//获取配置信息,设置最大切割块大小为20k
Configuration conf= job.getconfiguration();
conf.setLong(FileInputFormat.Split_Maxsize,1024*20);
修改副本数:
搜索“dfs.replication”
修改块大小:
搜素"dfs.blocksize"
这里块大小可以配置为更人性化的20k这种形式
注意块大小不能比hadoop默认的块最小值还小,如果要配置更小的值,那么需要修改块最小值的配置
dfs.namenode.fs-limits.min-block-size
1048576==1M
修改这里的最小值不能设置为10k这种形式,应该用数字,可以看说明
在s0节点修改后,发送覆盖所有节点,重启节点前,删除所有临时namenode节点目录/tmp/hadoop-wpy/
可以使用shell脚本来发送覆盖配置文件
scp $HADOOP_INSTALL/etc/hadoop_mode3/hdfs-site.xml wpy@s$i:$HADOOP_INSTALL/etc/hadoop_mode3/
ssh s$i rm -rf /tmp/hadoop-wpy/
格式化文件系统,(只格式化s0???) 需要重新在建/wpy/testdata/
修改hdfs-site.xml文件不需删除tmp文件
可以在数据节点的/wpy/hadoop-wpy/ data/下找到块文件
也可以通过网页来查看
数据切割与运行:
修改代码,运行查看数据的分布
可以从上下文获取获取ip,hostname等来查看,参考42讲
map方法执行:
run(){
//任务开始前执行一次
setup();
//循环执行
while(条件){
map(,,,,);
}
//任务执行结束后执行一次
cleanup()
}
一个切分的块split blocksize对应一个map任务,可能对应一个node
gz文件是不被切分的,会视作一个文件整体提交给map. blocksize依赖切割split算法,可以将gz文件解压成普通的文本文件。
可以通过InputSplit来打印切割的长度,
在setup和clientup中打印的日志,应该到hadoop_install/.../logs/userlogs/..stdout查看
同一块数据有多个备份,只执行其中的一个备份数据节点。
例如:块1:有2个备份的话,分别在s1,s4上
以备份数为2,共6个数据节点为例:
b0: s4 s6
b1: s2 s3
b2: s1 s4
b3: s2 s4
运行结果:
s1: no log
s2: no log
s3: no log
s4: log
s5: log
s6: log
上述结果说明:
数据节点的备份和程序运行的节点没有直接关系。
程序运行的节点是hadoop会找比较空闲的节点来运行,读取的数据来源于备份的数据节点。
s4,s5,s6上有程序运行,他们读取的数据节点来源于上述的数据备份点,多个备份只读取其中的一个备份节点数据。
hdfs:
namenode: ip+ : 50070
datanode: ip+ : 50075
datanode: ip+ : 50090
mapreduce(yarn):
resourcemanager(资源统一调度,和s0同一节点): ip + : 8088
nodemanager(数据节点管理,和数据节点在一起): ip + :8042,里面有打印的用户日志,可以通过logs查看
job: input dir | input file
gz;不能切割,要切割换成纯文本文件
blocksize: 20k min:10240
split切割准则
1.不能超过max blocksize,
否则实际运行=maxblocksize
2.可以小于minblocksize
实际运行=自定义
d3bf
blocksize
3.介于max 和min之间也是ok的
win7下修改hosts文件,便于浏览器直接访问节点数据,对节点就能直接转换
C:\Windows\System32\drivers\etc\hosts文件,增加
192.168.198.130 s0
192.168.198.132 s1
192.168.198.133 s2
192.168.198.139 s3
s0:8080 <==> 192.168.198.130:8080
mapreduce工作原理:
下载eclipse的Linux 64位版本tar.gz
下载HDT
进入到符号链接/a/b目录下
aaaa-->/a/b/
cd -P .. 表示退回到物理的上层目录,即/a。
./意义:.表示当前目录,/表示某个目录下,所以./表示当前目录下。
通常用来执行以下脚本。 ./bash 执行当前目录下的bash脚本,直接写bash是不行的,找不到这个脚本。
解压eclipse,建一个工作区/home/wpy/eclipse(最好在自己根目录下),注意将该客户机的内存调大至2G.
在eclipse目录下 启动程序./eclipse
为了能在运行eclipse时 脚本窗口也能用,放到后台运行,可以 ./eclipse &
为了能在任何目录下直接运行eclipse,可以这样 ln -s /usr/wpysoft/eclipse eclipse
快捷键的设定:
比如System.out.println();找keys,解除绑定,重新设置就行,再排除一个有没有已经有设为alt+/的快捷键,如果有解除改为其他的。保存就行了。
如果启动时忘记
前台程序隐藏到后天运行: ctrl +z
恢复到前台运行: bg +%1 1是作业号(jps查询)
下载eclipse的HTD插件:
hadoop2x-eclipse-plugin.zip插件
找到里面的relase目录下的问jar包解压出来,放到eclipse下的插件目录plugin目录
ungzip hadoop2x-eclipse-plugin.zip
重启eclpise,在preferrence->下发现 hadoop map/reduce条目就说明插件安装成功,然后制定hadoop的安装目录即可
打开hadoop的视图,配置端口号8020等,可以通过左侧的菜单的文件目录来查看hadoop的跟目录的文件
mapreduce的工作原理
1.客户端client: 提交作业Job(mr). 可以是集群中也可以是集群外
2.资源管理器ResourceManager(RM): 寻找执行该作业的负责人
3.应用程序主管ApplicationManager(AM): 执行作业Job任务。
流程图: 见截图
ant: apache的打包工具
过程: .java--编译-->.class--打包-->.jar
示例:
<project name="hadoopdeomo1" basedir="." default="package">
<path id="path1">
<fileset dir="E:\linux_down\hadoop-2.7.1\hadoop-2.7.1\_lib">
<include name="*.jar"/>
</fileset>
</path>
<target name="prepare">
<delete dir="${basedir}/build/classes"></delete>
<mkdir dir="${basedir}/build/classes"/>
</target>
<target name="compile" depends="prepare">
<javac srcdir="${basedir}/src" destdir="${basedir}/build/classes" classpathref="path1"
includeantruntime="true" encoding="UTF-8" debug="true" debuglevel="lines,source">
</javac>
</target>
<target name="package" depends="compile">
<jar destfile="${basedir}/my.jar" basedir="${basedir}/build/classes"></jar>
</target>
</project>
hadooop:
hdfs:
namenode:
datanode:
secondarynamenode:
mapreduce:
maper:
reducer:
yarn框架: mapreduce的执行框架,在集群上调度执行的。
ResourceManager:
NodeManager:
hadoop核心类/进程,这些都是单独的进程,都有main()方法入口。
hdfs: 数据的存储
NameNode : org.apache.hadoop.hdfs.server.namenode.NameNode
DataNode : org.apache.hadoop.hdfs.server.datanode.DataNode
SecondaryNameNode : org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
yarn: mapreduce作业的调度
ResourceManager : org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
NodeManager : org.apache.hadoop.yarn.server.nodemanager.NodeManager
hadoop的调试:
yarn调度框架:
调度MR作业的,是基于事件的;
作业Job提交给yarn的资源管理器,作业Job在这里称之为应用Application.
打开8088端口就可以看到,application是有状态的:运行,完成,失败,被杀死等状态。
application是基于状态机的,而状态之间的变换是通过事件触发的。
# Applications
* NEW
* NEW_SAVING
* SUBMITTED
* ACCEPTED
* RUNNING
* FINISHED
* FAILED
* KILLED
异步事件分发器(总管道):
AsyncDispatcher,两次分发,可查看流程图"yarn事件调度机制分析.png"
ResourceManager中的分发器:
真正的事件处理实体 处理的事件 事件分发器 事件的状态类型
RMAppAttemptImpl --> RMAppAttemptEvent --> ApplicationAttemptEventDispatcher --> enum RMAppAttemptEventType
RMAppImpl --> RMAppEvent --> ApplicationEventDispatcher --> enum RMAppEventType
RMNodeImpl --> RMNodeEvent --> NodeEventDispatcher --> enum RMNodeEventType
RMContainerImpl --> ContainerPreemptEvent --> RMContainerPreemptEventDispatcher --> enum ContainerPreemptEventType
exit() --> RMFatalEvent --> RMFatalEventDispatcher --> enum RMFatalEventType
ResourceScheduler --> SchedulerEvent --> SchedulerEventDispatcher --> enum SchedulerEventType
yarn的ResourceManager远程调试:
----------------------------
1.设置ResourceManager的JAM,开启远程调试:
设置YARN_RESOURCEMANAGER_OPTS变量开启调试,可以编辑一个shell脚本
开启脚本,enable_yarn_remotedebug.sh:
export YARN_RESOURCEMANAGER_OPTS ="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
禁用脚本,disable_yarn_remotedebug.sh:
export YARN_RESOURCEMANAGER_OPTS =
2.执行脚本
./enable_yarn_remotedebug.sh,并让立刻生效 source enable_yarn_remotedebug.sh
3.启动yarn脚本:
然后start-yarn.sh
可以看到lisening for ....8000,开始监听8000端口
4.在window7下配置eclispe,连接到资源管理器的8000端口
可以在源代码上添加断点
在ResourceManager的main(),debug confirguration,新弹出的窗口中找到remote java application,配置参数
连接类型;选standard
host:s0
port:8000
然后点击调试
Yarn底层事件分发调试RMApp事件处理调试+状态机变换
可以在资源管理器中找到ApplicationEventDispatcher分发器的真正的事件处理实体RMAppImpl的handel处理方法,handel方法是一个不断被
调用来执行对应的事件状态RMAppState的状态值变化的方法。打断点就就可以单步跟踪看到期状态的变换。
注意必须在每次执行this.writeLock.unlock();后才能在8088端口上查看到正在运行的作业applicaiton的状态值的变化,变化值对应
RMAppState中的从NEW-->FINISHED.
Hadoop 底层IPC和RPC通信原理解析
底层都不是传统的同步socket/serversocket套接字,而是在此基础上发展的异步通信NIO机制,提高效率。
IPC:进程间通信
RPC(remote prodedure call):远程过程调用
clinet--message:protocol(byete[])-->server
org.apache.hadoop.ipc.Client
org.apache.hadoop.ipc.Server
Client与Server之间通过进行一系列的封装成字节数组协议进行通信。
PB(protoBuf):协议缓存区,protobuf-java-2.5.0.jar包中就是用的谷歌的一些东西。
p
map:并行计算
map<K,V>
entry:条目 (key-value)
key:行号,自动产生,以0位基址。
Job 作业:每一次的mapreduce过程就是一个作业
job 作业==map task + reduce task==application
作业: job
任务: task
map task
reduce task
map过程:
原始<k-v>抽取出新的<k-v>
map<k-v>-->map-->map<k-v>
map前
(0,ssss1990---12--sdsd-fsdf)
(5,ssss1990---42--sdsd-fsdf)
(10,ssss1991---90--sdsd-fsdf)
(15,ssss1991---62--sdsd-fsdf)
map后,抽取有用的数据组成新的map
(1990,12)
(1990,42)
(1991,90)
(1991,62)
shuffle:洗牌,对上述的重复key分组,并排序
(1990,[12,42])
(1991,[90,62])
reduce:化简,取出有用的最大值数据,忽略其他数据
(1990,42)
(1990,90)
最后输出
1990,42
1991,90
hadoopdemo的编写
1.引入有用的jar包
解压2.7.1jar包,windows搜索.jar,copy至_lib目录,再以_lib为基目录,搜索source.jar,删除即可,留下的就是有用的hadoop的jar
2.配置文件
3.API
程序的运行
独立模式:
将程序达成jar包,不需打hadoop的jar,需要指定入口类,上传至Linux/windows
讲原始数据也上传至linux。
export HADOOP_CL:ASSPATH=~/demo/hadoopdemo1.jar
hadoop hadoopdemo1.MaxTemp file:///~/demo/*.gz ~/demo/output
集群模式:
先启动hadoop集群,测试一下jps。
创建目录 testdata,hadoop fs -mkdir -p /wpy/testdata
上传测试的原始数据到hdfs系统
hadoop fs -put 19*.gz /wpy/testdata
异常处理;
不能启动数据节点或者无法上传至hdfs系统
version不一致导致的,可以查看master和slaves节点的VERSION文件
/tmp/hadoop-wpy/dfs/name/current
中的namespaceID,namespaceID,发现不一致。
这个问题一般是由于两次或两次以上的格式化NameNode造成的,有两种方法可以解决,第一种方法是删除DataNode的所有资料;
第二种方法是修改每个DataNode的namespaceID(位于/dfs/data/current/VERSION文件中)
或修改NameNode的namespaceID(位于/dfs/name/current/VERSION文件中),使其一致。
干掉master和slave的所有/tmp/hadoop-wpy/dfs/name/current/VERSION文件,简便起见直接
rm -rf /tmp/hadoop-wpy/
再重新格式化
hadoop namenode -format
会发现Live Nodes的节点数为2了,OK。
运行jar程序:
hadoop jar hadoopdemo1.jar /wpy/testdata/19*.gz /wpy/output
运行成功,日志如下:
16/05/27 06:03:59 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=302337
FILE: Number of bytes written=1329243
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=370049
HDFS: Number of bytes written=18
HDFS: Number of read operations=25
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=13130
Map output records=13129
Map output bytes=118161
Map output materialized bytes=144431
Input split bytes=190
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=144431
Reduce input records=13129
Reduce output records=2
Spilled Records=26258
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=184
Total committed heap usage (bytes)=457912320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=147972
File Output Format Counters
Bytes Written=18
wpy的第一个hadoop程序计算最高最低气温结果:true
成功会多出一个标记文件:
/wpy/output/_SUCCESS
还有一个输出结果的文件:
/wpy/output/part-r-00000
hadoop fs -cat /wpy/output/part-r-00000
1901 317
1902 244
集群调整与shell脚本:
前期技术准备:
shell脚本:
新建一个文件,并修改文件执行权限
chmod a+x line_bash.sh
编辑内容,注意文件的第一行的声明:
#!/bin/bash
echo "hello world!"
执行文件 ./line_bash.sh
复制一个文件b.txt到s1,s2,s3
#!/bin/bash
for((i=1;i<4;i++)){
scp ~/wpy/b.txt root@s$i:/home/wpy/wpy/
}
修改副本数和块block的大小:
需要批量修改的配置文件
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
配置参考:/share/XXXX/XXXX.jar
core-site.xml可以配置的元素参考
hadoop-common-2.7.1.jar/core-default.xml文件
hdfs-site.xml可以配置的元素参考
hadoop-hdfs-2.7.1.jar/hdfs-default.xml
mapred-site.xml可以配置的元素参考
hadoop-mapreduce-client-core-2.7.1.jar/mapred-default.xml
yarn-site.xml可以配置的元素参考
hadoop-yarn-common-2.7.1.jar/yarn-default.xml
配置文件的加载:
先加载系统默认的default.xml文件,再加载对应的用户自定义文件
如:先 core-default.xml 后 core-site.xml
在程序中可以修改这些配置信息:
eg;
//获取配置信息,设置最大切割块大小为20k
Configuration conf= job.getconfiguration();
conf.setLong(FileInputFormat.Split_Maxsize,1024*20);
修改副本数:
搜索“dfs.replication”
修改块大小:
搜素"dfs.blocksize"
这里块大小可以配置为更人性化的20k这种形式
注意块大小不能比hadoop默认的块最小值还小,如果要配置更小的值,那么需要修改块最小值的配置
dfs.namenode.fs-limits.min-block-size
1048576==1M
修改这里的最小值不能设置为10k这种形式,应该用数字,可以看说明
在s0节点修改后,发送覆盖所有节点,重启节点前,删除所有临时namenode节点目录/tmp/hadoop-wpy/
可以使用shell脚本来发送覆盖配置文件
scp $HADOOP_INSTALL/etc/hadoop_mode3/hdfs-site.xml wpy@s$i:$HADOOP_INSTALL/etc/hadoop_mode3/
ssh s$i rm -rf /tmp/hadoop-wpy/
格式化文件系统,(只格式化s0???) 需要重新在建/wpy/testdata/
修改hdfs-site.xml文件不需删除tmp文件
可以在数据节点的/wpy/hadoop-wpy/ data/下找到块文件
也可以通过网页来查看
数据切割与运行:
修改代码,运行查看数据的分布
可以从上下文获取获取ip,hostname等来查看,参考42讲
map方法执行:
run(){
//任务开始前执行一次
setup();
//循环执行
while(条件){
map(,,,,);
}
//任务执行结束后执行一次
cleanup()
}
一个切分的块split blocksize对应一个map任务,可能对应一个node
gz文件是不被切分的,会视作一个文件整体提交给map. blocksize依赖切割split算法,可以将gz文件解压成普通的文本文件。
可以通过InputSplit来打印切割的长度,
在setup和clientup中打印的日志,应该到hadoop_install/.../logs/userlogs/..stdout查看
同一块数据有多个备份,只执行其中的一个备份数据节点。
例如:块1:有2个备份的话,分别在s1,s4上
以备份数为2,共6个数据节点为例:
b0: s4 s6
b1: s2 s3
b2: s1 s4
b3: s2 s4
运行结果:
s1: no log
s2: no log
s3: no log
s4: log
s5: log
s6: log
上述结果说明:
数据节点的备份和程序运行的节点没有直接关系。
程序运行的节点是hadoop会找比较空闲的节点来运行,读取的数据来源于备份的数据节点。
s4,s5,s6上有程序运行,他们读取的数据节点来源于上述的数据备份点,多个备份只读取其中的一个备份节点数据。
hdfs:
namenode: ip+ : 50070
datanode: ip+ : 50075
datanode: ip+ : 50090
mapreduce(yarn):
resourcemanager(资源统一调度,和s0同一节点): ip + : 8088
nodemanager(数据节点管理,和数据节点在一起): ip + :8042,里面有打印的用户日志,可以通过logs查看
job: input dir | input file
gz;不能切割,要切割换成纯文本文件
blocksize: 20k min:10240
split切割准则
1.不能超过max blocksize,
否则实际运行=maxblocksize
2.可以小于minblocksize
实际运行=自定义
d3bf
blocksize
3.介于max 和min之间也是ok的
win7下修改hosts文件,便于浏览器直接访问节点数据,对节点就能直接转换
C:\Windows\System32\drivers\etc\hosts文件,增加
192.168.198.130 s0
192.168.198.132 s1
192.168.198.133 s2
192.168.198.139 s3
s0:8080 <==> 192.168.198.130:8080
mapreduce工作原理:
下载eclipse的Linux 64位版本tar.gz
下载HDT
进入到符号链接/a/b目录下
aaaa-->/a/b/
cd -P .. 表示退回到物理的上层目录,即/a。
./意义:.表示当前目录,/表示某个目录下,所以./表示当前目录下。
通常用来执行以下脚本。 ./bash 执行当前目录下的bash脚本,直接写bash是不行的,找不到这个脚本。
解压eclipse,建一个工作区/home/wpy/eclipse(最好在自己根目录下),注意将该客户机的内存调大至2G.
在eclipse目录下 启动程序./eclipse
为了能在运行eclipse时 脚本窗口也能用,放到后台运行,可以 ./eclipse &
为了能在任何目录下直接运行eclipse,可以这样 ln -s /usr/wpysoft/eclipse eclipse
快捷键的设定:
比如System.out.println();找keys,解除绑定,重新设置就行,再排除一个有没有已经有设为alt+/的快捷键,如果有解除改为其他的。保存就行了。
如果启动时忘记
前台程序隐藏到后天运行: ctrl +z
恢复到前台运行: bg +%1 1是作业号(jps查询)
下载eclipse的HTD插件:
hadoop2x-eclipse-plugin.zip插件
找到里面的relase目录下的问jar包解压出来,放到eclipse下的插件目录plugin目录
ungzip hadoop2x-eclipse-plugin.zip
重启eclpise,在preferrence->下发现 hadoop map/reduce条目就说明插件安装成功,然后制定hadoop的安装目录即可
打开hadoop的视图,配置端口号8020等,可以通过左侧的菜单的文件目录来查看hadoop的跟目录的文件
mapreduce的工作原理
1.客户端client: 提交作业Job(mr). 可以是集群中也可以是集群外
2.资源管理器ResourceManager(RM): 寻找执行该作业的负责人
3.应用程序主管ApplicationManager(AM): 执行作业Job任务。
流程图: 见截图
ant: apache的打包工具
过程: .java--编译-->.class--打包-->.jar
示例:
<project name="hadoopdeomo1" basedir="." default="package">
<path id="path1">
<fileset dir="E:\linux_down\hadoop-2.7.1\hadoop-2.7.1\_lib">
<include name="*.jar"/>
</fileset>
</path>
<target name="prepare">
<delete dir="${basedir}/build/classes"></delete>
<mkdir dir="${basedir}/build/classes"/>
</target>
<target name="compile" depends="prepare">
<javac srcdir="${basedir}/src" destdir="${basedir}/build/classes" classpathref="path1"
includeantruntime="true" encoding="UTF-8" debug="true" debuglevel="lines,source">
</javac>
</target>
<target name="package" depends="compile">
<jar destfile="${basedir}/my.jar" basedir="${basedir}/build/classes"></jar>
</target>
</project>
hadooop:
hdfs:
namenode:
datanode:
secondarynamenode:
mapreduce:
maper:
reducer:
yarn框架: mapreduce的执行框架,在集群上调度执行的。
ResourceManager:
NodeManager:
hadoop核心类/进程,这些都是单独的进程,都有main()方法入口。
hdfs: 数据的存储
NameNode : org.apache.hadoop.hdfs.server.namenode.NameNode
DataNode : org.apache.hadoop.hdfs.server.datanode.DataNode
SecondaryNameNode : org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
yarn: mapreduce作业的调度
ResourceManager : org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
NodeManager : org.apache.hadoop.yarn.server.nodemanager.NodeManager
hadoop的调试:
yarn调度框架:
调度MR作业的,是基于事件的;
作业Job提交给yarn的资源管理器,作业Job在这里称之为应用Application.
打开8088端口就可以看到,application是有状态的:运行,完成,失败,被杀死等状态。
application是基于状态机的,而状态之间的变换是通过事件触发的。
# Applications
* NEW
* NEW_SAVING
* SUBMITTED
* ACCEPTED
* RUNNING
* FINISHED
* FAILED
* KILLED
异步事件分发器(总管道):
AsyncDispatcher,两次分发,可查看流程图"yarn事件调度机制分析.png"
ResourceManager中的分发器:
真正的事件处理实体 处理的事件 事件分发器 事件的状态类型
RMAppAttemptImpl --> RMAppAttemptEvent --> ApplicationAttemptEventDispatcher --> enum RMAppAttemptEventType
RMAppImpl --> RMAppEvent --> ApplicationEventDispatcher --> enum RMAppEventType
RMNodeImpl --> RMNodeEvent --> NodeEventDispatcher --> enum RMNodeEventType
RMContainerImpl --> ContainerPreemptEvent --> RMContainerPreemptEventDispatcher --> enum ContainerPreemptEventType
exit() --> RMFatalEvent --> RMFatalEventDispatcher --> enum RMFatalEventType
ResourceScheduler --> SchedulerEvent --> SchedulerEventDispatcher --> enum SchedulerEventType
yarn的ResourceManager远程调试:
----------------------------
1.设置ResourceManager的JAM,开启远程调试:
设置YARN_RESOURCEMANAGER_OPTS变量开启调试,可以编辑一个shell脚本
开启脚本,enable_yarn_remotedebug.sh:
export YARN_RESOURCEMANAGER_OPTS ="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
禁用脚本,disable_yarn_remotedebug.sh:
export YARN_RESOURCEMANAGER_OPTS =
2.执行脚本
./enable_yarn_remotedebug.sh,并让立刻生效 source enable_yarn_remotedebug.sh
3.启动yarn脚本:
然后start-yarn.sh
可以看到lisening for ....8000,开始监听8000端口
4.在window7下配置eclispe,连接到资源管理器的8000端口
可以在源代码上添加断点
在ResourceManager的main(),debug confirguration,新弹出的窗口中找到remote java application,配置参数
连接类型;选standard
host:s0
port:8000
然后点击调试
Yarn底层事件分发调试RMApp事件处理调试+状态机变换
可以在资源管理器中找到ApplicationEventDispatcher分发器的真正的事件处理实体RMAppImpl的handel处理方法,handel方法是一个不断被
调用来执行对应的事件状态RMAppState的状态值变化的方法。打断点就就可以单步跟踪看到期状态的变换。
注意必须在每次执行this.writeLock.unlock();后才能在8088端口上查看到正在运行的作业applicaiton的状态值的变化,变化值对应
RMAppState中的从NEW-->FINISHED.
Hadoop 底层IPC和RPC通信原理解析
底层都不是传统的同步socket/serversocket套接字,而是在此基础上发展的异步通信NIO机制,提高效率。
IPC:进程间通信
RPC(remote prodedure call):远程过程调用
clinet--message:protocol(byete[])-->server
org.apache.hadoop.ipc.Client
org.apache.hadoop.ipc.Server
Client与Server之间通过进行一系列的封装成字节数组协议进行通信。
PB(protoBuf):协议缓存区,protobuf-java-2.5.0.jar包中就是用的谷歌的一些东西。
p
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- hadoop上传文件功能实例代码
- java结合HADOOP集群文件上传下载
- Hadoop 2.x伪分布式环境搭建详细步骤
- Java访问Hadoop分布式文件系统HDFS的配置说明