您的位置:首页 > 移动开发

Samza的ApplicationMaster

2014-04-26 18:01 155 查看
当Samza ApplicationMaster启动时,它做以下的事情:

通过STREAMING_CONFIG环境变量从YARN获取配置信息(configuration)

在随机端口上 启动一个JMX server

实例化一个metrics registry和reporter来追踪计量信息

将AM向YARN的RM注册

使用每个stream的PartitionManager来获取总共的partition数量

从Samza的job configuration里获取总的container数量

将partition分给container(在Samza AM的dashboard里,称为Task Group)

为每个container向YARN发送一个ResourceRequest

每秒向YARN RM poll一次,检查allocated and released containers

SamzaAppMaster的实现

并不是提交AppMaster,只是向RM注册这个AppMaster。因为此时,AppMaster已经启动了。
1.在SamzaAppMasterLifecycle对象的onInit()方法中,使用amCient.registerApplicationMaster
2 val response = amClient.registerApplicationMaster (host , state.rpcPort, "%s:%d" format (host, state. trackingPort))

amClient对象的类:

org.apache.hadoop.yarn.client.api.async.AMRMClientAsync<T extends org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest>

AMRMClientAsync
handles communication with the ResourceManager and provides asynchronous updates on events such as container allocations and completions. It contains a thread that sends periodic heartbeats to the ResourceManager. It should be used by implementing a CallbackHandler:

[code]class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
public void onContainersAllocated(List<Container> containers) {
[run tasks on the containers]
}

public void onContainersCompleted(List<ContainerStatus> statuses) {
[update progress, check whether app is done]
}

public void onNodesUpdated(List<NodeReport> updated) {}

public void onReboot() {}
}


[/code]
The client's lifecycle should be managed similarly to the following:
[code]AMRMClientAsync asyncClient =
createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
asyncClient.init(conf);
asyncClient.start();
RegisterApplicationMasterResponse response = asyncClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
asyncClient.addContainerRequest(containerRequest);
[... wait for application to complete]
asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
asyncClient.stop();

这个类是用来做为一个Client和RM进行通信,并且注册一个用于回调的对象来处理container 的allocation和completion事件。它启动一个线程,周期性地发送hearbeat至ResourceManager
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: