您的位置:首页 > 其它

Flink on Yarn(HA配置)

2016-12-21 17:47 357 查看
根据部署方式不同,Flink Jobmanager HA配置分为2种:

1、standalone cluster HA
2、Yarn cluster HA


其中,standalone cluster HA可参考我之前的一篇文章

简单回顾下,standalone模式的HA需要多个“活着的”Jobmanager,其中1个作为leader,其他作为standby,leader选举依赖于Zookeeper。可以用下面的一张图来形象的表述standalone HA:



本文专门讨论Yarn下Flink HA的搭建与配置。

一、Flink On Yarn 简介

Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:

用于启动JobManager(AM)的容器
用于启动TaskManager的容器


我们可以通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息:



其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量。

在配置HA前,先通过-q看一下我的yarn集群的资源情况:



从图中可以看出,我配置的每个NodeManager的内存是8192MB(yarn-site.xml),每个NodeManager的vcores数量是8。所以,当前yarn集群中可用内存总量为32768,总cores是32.

二、Flink on Yarn HA 配置

1、配置准备

在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过$HADOOP_HOME/sbin/start-all.sh启动hdfs和yarn。

2、配置AM在尝试重启的最大次数(yarn-site.xml)

此配置需要在
$HADOOP_CONF_DIR
的yarn-site.xml添加。

添加如下配置:



此配置代表application master在重启时,尝试的最大次数。

3、配置Application Attempts(flink-conf.yaml)

此参数需要在
$FLINK_HOME/conf
的flink-conf.yaml中配置。

添加如下配置:



此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。

注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.


4、配置zookeeper信息

虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息:



其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。

在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。

完整的Flink配置信息如下:

#==============================================================================
#  Common
#==============================================================================
env.java.home: /home/flink/java/jdk1.8.0_60
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 6192
taskmanager.heap.mb: 8192
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false

#==============================================================================
# Web Frontend
#==============================================================================
jobmanager.web.port: 8081

#==============================================================================
# Streaming state checkpointing
#==============================================================================
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints

#==============================================================================
# Advanced
#==============================================================================
taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /home/flink/hadoop/hadoop-2.6.0/etc/hadoop

#==============================================================================
# Master High Availability (required configuration)
#==============================================================================
recovery.mode: zookeeper
recovery.zookeeper.quorum: flink:2181,data0:2181,mf:2181
recovery.zookeeper.storageDir: hdfs:///flink/recovery
recovery.zookeeper.path.root: /flinkOnYarn
recovery.zookeeper.path.namespace: /cluster_yarn

#==============================================================================
# Yarn
#==============================================================================
yarn.application-attempts: 4

#==============================================================================
# Yarn will overwrite following parameters
# 1. jobmanager.rpc.address
# 2. taskmanager.tmp.dirs
# 3. parallelism.default
#==============================================================================


三、启动Flink Yarn Session

启动Flink Yarn Session有2种模式:

分离模式
客户端模式


通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过
yarn application -kill <Application_ID>
命令来停止。

我们这里采用分离模式来启动Flink Yarn Session:

yarn-session.sh -n 3 -jm 4096 -tm 8192 -s 8 -nm FlinkOnYarnSession -d -st


我们可以通过yarn的webUI查看一下当前启动的Application:



可以看到名字是FlinkOnYarnSession,总内存32GB,运行使用的内存28GB(-jm指定了4GB),当前容器数量为4.我们通过ApplicationMaster tracking一下Flink的WebUI:



四、提交Job

通过CLI方式提交:

flink run -c wikiedits.Test1 toptrade-flink-1.0.jar


我们看下目前Job的JobGraph:



五、HA测试

现在,我们kill掉Jobmanager(AM)进程YarnApplicationMasterRunner,看看Yarn Cluster的HA情况。



我们看到Application Attemp的ID增加了1:



我们再到mf42的
$YARN_CONF_DIR
(如果没设置则在$HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在
$HADOOP_CONF_DIR/userlogs/<Application_ID>/
下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。

再次查看进程:



YarnApplicationMasterRunner进程号变了。

此时,Flink的WebUI又可以访问了,而且Job被cancel掉后重新启动了。

六、未来Flink1.2中的Flink On Yarn

增强了以下几点:

1、不用先启动Yarn Session再提交Job,而是直接提交Job到Yarn集群,因此client可以断开连接
2、用户代码库和配置文件直接在classpath下,而不是在动态类加载器中
3、容器在需要时分配,不需要时释放资源
4、按需分配的容器可以针对不同的operator分配不同的CPU和Core资源,通过配置文件实现


Flink1.2中ResourceManager提出了一个Dispatcher的概念,主要用于统一发布Job并监控实例的运行。但时可以选择是否使用Dispatcher。





参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html

https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html#yarn-cluster-high-availability

http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/#ibm-pcon

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

https://www.youtube.com/watch?v=L21N8mNtvME

http://www.jianshu.com/p/8a3177095072
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Flink yarn HA