akka cluster sharding source code 学习 (2/5) handle off
2015-08-30 21:38
344 查看
一旦 shard coordinator(相当于分布式系统的 zookeeper) 启动,它就会启动一个定时器,每隔一定的时间尝试平衡一下集群中各个节点的负载,平衡的办法是把那些负载较重的 actor 移动到负载较轻的节点上。在这一点上,我以前的理解有误,我以为 shardRegion 是移动的最小单位。
当 coordinator 收到 ReblanceTick 后,就开始尝试平衡系统负载
上面的逻辑我看懂了,但是 Future 的用法没看明白。按照一般的写法,当 shardsFuture 返回 Failure 以后,应该直接执行 RebalanceResut(Set.empty).pipeTo(self),不知道为什么失败以后还要尝试等待 Future
allocationStrategy 提供了默认的实现,也可以自定义负载均衡策略。rebalance 函数返回的是 Set(ShardId),即那些要被移动的 shards
当 coordinator 收到 RebalanceResult 后,开始 启动 balance 逻辑
rebalanceInProcess 是一个 Set,记录正在被移动的 shard,我想,在新一轮 balance 开始时, rebalanceInProcess 为空的情况只会发生在上次 balance 还没有做完。不知道这个时候,是应该报错还是继续 balance 更好,因为 balanceStrategy 应该不会考虑吧到 上一轮 balance 还没做完这种可能性。
然后, coordinator 启动 rebalanceWorker,也就是上篇提到的替身 actor。
akka 的逻辑是基于消息传递的,这种代码其实是很难去读的。在 rebalanceWorker 运行时,牵扯到很多个 actor。首先是,coordinator,其次是 shardRegion,也就是 host 待迁移 shard actor 的那个 region,然后是 shard actor 本身,最后是系统里所有的 shardRegion,他们也要参与进来。写到这里,我不禁把电脑屏幕竖了起来。
1. RebalanceWorker 首先给所有的 ShardRegion BeginHandOff 消息,告诉大家,hand off 开始,然后等待大家的回复
2. ShardRegion 收到 BeginHandOff 后,开始更新自己的知识库,将 HostShardRegion 和 shardActor 的记忆从自己的知识库中抹去
最后,发送 BeginHandOffAck 消息,告诉 rebalanceWorker 自己准备完毕(这些 shardRegion 以后也没事干了)
3. 继续回到 rebalanceWorker,它发送 HandOff 告诉 Host shard actor 的 ShardRegion,你可以做自己的清理工作了。然后将自己的状态设置成 stoppingShard,等待 ShardStopped 消息,这个消息的来源有两个,一个是 HostShardRegion,另外一个是 shard actor
4. HostShardRegion 收到 HandOff 消息后
如果 HostShardRegion 已经不再含有 shard actor,那么直接返回 ShardStopped,否则 HandOff 这个 Set 加入 shard actor,并将 HandOff 传给 shard actor
5. 又看了一遍代码,发现 shard actor 和 entity actor 又是两种东西,shard actor 存在于 entity actor 和 shard region 之间
目前还不知道 entity actor 和 shard region 之间的关系
从这段代码来看, shard actor 与 entity actor 是一对多的关系。
从这段代码看, shard actor 与 entity actor 的关系是一对一,因为当 entity stop self 了以后, shard actor 也会 stop self。这让我想到 coursera reactive programming 的最后一道作业题,为什么也是类似于 一个 entity 有一个 shard actor 对应。
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
当 coordinator 收到 ReblanceTick 后,就开始尝试平衡系统负载
case RebalanceTick ⇒ if (persistentState.regions.nonEmpty) { val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress) shardsFuture.value match { case Some(Success(shards)) ⇒ continueRebalance(shards) case _ ⇒ // continue when future is completed shardsFuture.map { shards ⇒ RebalanceResult(shards) }.recover { case _ ⇒ RebalanceResult(Set.empty) }.pipeTo(self) } }
上面的逻辑我看懂了,但是 Future 的用法没看明白。按照一般的写法,当 shardsFuture 返回 Failure 以后,应该直接执行 RebalanceResut(Set.empty).pipeTo(self),不知道为什么失败以后还要尝试等待 Future
allocationStrategy 提供了默认的实现,也可以自定义负载均衡策略。rebalance 函数返回的是 Set(ShardId),即那些要被移动的 shards
当 coordinator 收到 RebalanceResult 后,开始 启动 balance 逻辑
def continueRebalance(shards: Set[ShardId]): Unit = shards.foreach { shard ⇒ if (!rebalanceInProgress(shard)) { persistentState.shards.get(shard) match { case Some(rebalanceFromRegion) ⇒ rebalanceInProgress += shard log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, persistentState.regions.keySet ++ persistentState.regionProxies) .withDispatcher(context.props.dispatcher)) case None ⇒ log.debug("Rebalance of non-existing shard [{}] is ignored", shard) } } }
rebalanceInProcess 是一个 Set,记录正在被移动的 shard,我想,在新一轮 balance 开始时, rebalanceInProcess 为空的情况只会发生在上次 balance 还没有做完。不知道这个时候,是应该报错还是继续 balance 更好,因为 balanceStrategy 应该不会考虑吧到 上一轮 balance 还没做完这种可能性。
然后, coordinator 启动 rebalanceWorker,也就是上篇提到的替身 actor。
private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration, regions: Set[ActorRef]) extends Actor { import Internal._ regions.foreach(_ ! BeginHandOff(shard)) var remaining = regions import context.dispatcher context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout) def receive = { case BeginHandOffAck(`shard`) ⇒ remaining -= sender() if (remaining.isEmpty) { from ! HandOff(shard) context.become(stoppingShard, discardOld = true) } case ReceiveTimeout ⇒ done(ok = false) } def stoppingShard: Receive = { case ShardStopped(shard) ⇒ done(ok = true) case ReceiveTimeout ⇒ done(ok = false) } def done(ok: Boolean): Unit = { context.parent ! RebalanceDone(shard, ok) context.stop(self) } }
akka 的逻辑是基于消息传递的,这种代码其实是很难去读的。在 rebalanceWorker 运行时,牵扯到很多个 actor。首先是,coordinator,其次是 shardRegion,也就是 host 待迁移 shard actor 的那个 region,然后是 shard actor 本身,最后是系统里所有的 shardRegion,他们也要参与进来。写到这里,我不禁把电脑屏幕竖了起来。
1. RebalanceWorker 首先给所有的 ShardRegion BeginHandOff 消息,告诉大家,hand off 开始,然后等待大家的回复
2. ShardRegion 收到 BeginHandOff 后,开始更新自己的知识库,将 HostShardRegion 和 shardActor 的记忆从自己的知识库中抹去
case BeginHandOff(shard) ⇒ log.debug("BeginHandOff shard [{}]", shard) if (regionByShard.contains(shard)) { val regionRef = regionByShard(shard) val updatedShards = regions(regionRef) - shard if (updatedShards.isEmpty) regions -= regionRef else regions = regions.updated(regionRef, updatedShards) regionByShard -= shard } sender() ! BeginHandOffAck(shard)
最后,发送 BeginHandOffAck 消息,告诉 rebalanceWorker 自己准备完毕(这些 shardRegion 以后也没事干了)
3. 继续回到 rebalanceWorker,它发送 HandOff 告诉 Host shard actor 的 ShardRegion,你可以做自己的清理工作了。然后将自己的状态设置成 stoppingShard,等待 ShardStopped 消息,这个消息的来源有两个,一个是 HostShardRegion,另外一个是 shard actor
4. HostShardRegion 收到 HandOff 消息后
case msg @ HandOff(shard) ⇒ log.debug("HandOff shard [{}]", shard) // must drop requests that came in between the BeginHandOff and now, // because they might be forwarded from other regions and there // is a risk or message re-ordering otherwise if (shardBuffers.contains(shard)) { shardBuffers -= shard loggedFullBufferWarning = false } if (shards.contains(shard)) { handingOff += shards(shard) shards(shard) forward msg } else sender() ! ShardStopped(shard)
如果 HostShardRegion 已经不再含有 shard actor,那么直接返回 ShardStopped,否则 HandOff 这个 Set 加入 shard actor,并将 HandOff 传给 shard actor
5. 又看了一遍代码,发现 shard actor 和 entity actor 又是两种东西,shard actor 存在于 entity actor 和 shard region 之间
目前还不知道 entity actor 和 shard region 之间的关系
def getEntity(id: EntityId): ActorRef = { val name = URLEncoder.encode(id, "utf-8") context.child(name).getOrElse { log.debug("Starting entity [{}] in shard [{}]", id, shardId) val a = context.watch(context.actorOf(entityProps, name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) state = state.copy(state.entities + id) a } }
从这段代码来看, shard actor 与 entity actor 是一对多的关系。
def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { case HandOff(`shardId`) ⇒ handOff(sender()) case HandOff(shard) ⇒ log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard) case _ ⇒ unhandled(msg) } def handOff(replyTo: ActorRef): Unit = handOffStopper match { case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId) case None ⇒ log.debug("HandOff shard [{}]", shardId) if (state.entities.nonEmpty) { handOffStopper = Some(context.watch(context.actorOf( handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage)))) //During hand off we only care about watching for termination of the hand off stopper context become { case Terminated(ref) ⇒ receiveTerminated(ref) } } else { replyTo ! ShardStopped(shardId) context stop self } }
def receiveTerminated(ref: ActorRef): Unit = { if (handOffStopper.exists(_ == ref)) context stop self else if (idByRef.contains(ref) && handOffStopper.isEmpty) entityTerminated(ref) }
从这段代码看, shard actor 与 entity actor 的关系是一对一,因为当 entity stop self 了以后, shard actor 也会 stop self。这让我想到 coursera reactive programming 的最后一道作业题,为什么也是类似于 一个 entity 有一个 shard actor 对应。
相关文章推荐
- JQuery的ready函数与JS的onload的区别
- 博客内容纲要
- css3媒体查询Medai Queries小例
- Thread VS Runnable
- 早死早托生
- 打印出的文件目录
- C++虚函数与虚表
- Tokyo2014 There is No Alternative (最小生成树,灵活题)
- mysql下载地址
- java servlet
- C++虚函数与虚表
- loadrunner回放时弹出windows安全警告
- 一小时学会用Python Socket 开发可并发的FTP服务器!!
- POJ1655树的重心 问删除哪个点,使余下的各个子树结点个数的最大值最小.
- OSPF虚链路
- Django之第一个app<18>
- 人无趣的原因
- img src="#"或者src="" 会发送请求的问题
- 关于c++中字符串的输入问题
- Swift2.0不深入只浅出入门教程-01-The Basic