Samza的ApplicationMaster
2014-04-26 18:01
155 查看
当Samza ApplicationMaster启动时,它做以下的事情:
通过STREAMING_CONFIG环境变量从YARN获取配置信息(configuration)
在随机端口上 启动一个JMX server
实例化一个metrics registry和reporter来追踪计量信息
将AM向YARN的RM注册
使用每个stream的PartitionManager来获取总共的partition数量
从Samza的job configuration里获取总的container数量
将partition分给container(在Samza AM的dashboard里,称为Task Group)
为每个container向YARN发送一个ResourceRequest
每秒向YARN RM poll一次,检查allocated and released containers
SamzaAppMaster的实现
并不是提交AppMaster,只是向RM注册这个AppMaster。因为此时,AppMaster已经启动了。
1.在SamzaAppMasterLifecycle对象的onInit()方法中,使用amCient.registerApplicationMaster
2 val response = amClient.registerApplicationMaster (host , state.rpcPort, "%s:%d" format (host, state. trackingPort))
amClient对象的类:
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync<T extends org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest>
[/code]
The client's lifecycle should be managed similarly to the following:
通过STREAMING_CONFIG环境变量从YARN获取配置信息(configuration)
在随机端口上 启动一个JMX server
实例化一个metrics registry和reporter来追踪计量信息
将AM向YARN的RM注册
使用每个stream的PartitionManager来获取总共的partition数量
从Samza的job configuration里获取总的container数量
将partition分给container(在Samza AM的dashboard里,称为Task Group)
为每个container向YARN发送一个ResourceRequest
每秒向YARN RM poll一次,检查allocated and released containers
SamzaAppMaster的实现
并不是提交AppMaster,只是向RM注册这个AppMaster。因为此时,AppMaster已经启动了。
1.在SamzaAppMasterLifecycle对象的onInit()方法中,使用amCient.registerApplicationMaster
2 val response = amClient.registerApplicationMaster (host , state.rpcPort, "%s:%d" format (host, state. trackingPort))
amClient对象的类:
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync<T extends org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest>
AMRMClientAsynchandles communication with the ResourceManager and provides asynchronous updates on events such as container allocations and completions. It contains a thread that sends periodic heartbeats to the ResourceManager. It should be used by implementing a CallbackHandler:
[code]class MyCallbackHandler implements AMRMClientAsync.CallbackHandler { public void onContainersAllocated(List<Container> containers) { [run tasks on the containers] } public void onContainersCompleted(List<ContainerStatus> statuses) { [update progress, check whether app is done] } public void onNodesUpdated(List<NodeReport> updated) {} public void onReboot() {} }
[/code]
The client's lifecycle should be managed similarly to the following:
[code]AMRMClientAsync asyncClient = createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); asyncClient.init(conf); asyncClient.start(); RegisterApplicationMasterResponse response = asyncClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); asyncClient.addContainerRequest(containerRequest); [... wait for application to complete] asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); asyncClient.stop();
这个类是用来做为一个Client和RM进行通信,并且注册一个用于回调的对象来处理container 的allocation和completion事件。它启动一个线程,周期性地发送hearbeat至ResourceManager
相关文章推荐
- 无法创建Web Application项目的问题
- Android源码分析-资源加载机制
- Android安卓隐藏DatePickerDialog的年和日
- Android 核心分析 之六 -----IPC框架分析 Binder,Service,Service manager
- Android四大组件--Broadcast Receiver详解
- 微信公众平台开发(一) 申请微信公众账号
- android pull解析xml文件实例
- ios 图片圆角或圆形处理
- CocosBuilder学习之一:认识CocosBuilder
- Android核心分析之四 ---手机的软件形态
- Multiple markers at this line
- Android 核心分析 之五 -----基本空间划分
- Android中解析XML
- 02微信公众平台 - 实现【快递查询】功能函数,返回一个文本字符串。
- Android是什么 之三-------手机之硬件形态
- Android核心分析 之二 -------方法论探讨之概念空间篇
- iOS 中定时器的开启和关闭
- Android APK混淆编译出现的问题
- 书讯: CFHipsterRef: Low-Level Programming on iOS & Mac OS X
- Android核心分析 之一--------分析方法论探讨之设计意图