ReplicaStateMachine分析
2017-05-31 17:25
330 查看
ReplicaStateMachine是KafkaController维护副本的状态机,副本状态由ReplicaState表示,有七种不同的副本状态:
一 副本状态的转化
NonExistentReplica -> NewReplica
新创建topic或者重新副本的时候都有可能创建新的副本,这时后副本就转换成NewReplica,在转换到时候会干以下事情:
KafkaController向该副本所在的broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
NewReplica -> OnlineReplica
KafkaController将该副本加入AR副本集
OnlineReplica/ OfflineReplica -> OnlineReplica
副本上线,OfflineReplica就会转成OnlineReplica状态
KafkaController向该副本所在的broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
NewReplica/OnlineReplica/OfflineReplica/ReplicaDeletionStart->
OfflineReplica
KafkaController向该副本所在的broker发送StopReplicaRequest;
然后会将ISR中移除该副本;
向其他可用副本所在broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
OfflineReplica -> ReplicaDeletionStart
KafkaController向该副本所在的broker发送StopReplicaRequest;
ReplicaDeletionStart ->ReplicaDeletionSuccessful
只是进行状态转换
ReplicaDeletionStart ->ReplicaDeletionIneligible
只是进行状态转换
ReplicaDeletionSuccessful -> NonExistentReplica
KafkaController从AR集合中删除该副本
二 核心字段
controllerContext: ControllerContext 用于维护KafkaController中上下文信息
replicaState:Map[PartitionAndReplica, ReplicaState] 用于保存每一个副本对应的状态
brokerChangeListener:BrokerChangeListener zookeeper的监听器,用于监听broker的变化,比如broker宕机或者重新上线
brokerRequestBatch:ControllerBrokerRequestBatch 用于向指定的Broker批量发送请请求
三 重要方法
3.1 startup 在ReplicaStateMachine启动时候,会初始化副本状态,并尝试将可用副本状态转化为在线状态
private def
initializeReplicaState() {
// 遍历所有的分区的副本集
for((topicPartition, assignedReplicas) <-
controllerContext.partitionReplicaAssignment) {
val topic
= topicPartition.topic
val partition
= topicPartition.partition
assignedReplicas.foreach {
replicaId =>
val partitionAndReplica
= PartitionAndReplica(topic,
partition, replicaId)
// 如果可用的broker包含该副本,则把他们都初始化为OnlineReplica
if (controllerContext.liveBrokerIds.contains(replicaId))
replicaState.put(partitionAndReplica,
OnlineReplica)
else
// 否则初始化为ReplicaDeletionIneligible状态
replicaState.put(partitionAndReplica,
ReplicaDeletionIneligible)
}
}
}
3.2 handleStateChange 处理副本状态转化,转换之前都会进行前置状态的校验
一 副本状态的转化
NonExistentReplica -> NewReplica
新创建topic或者重新副本的时候都有可能创建新的副本,这时后副本就转换成NewReplica,在转换到时候会干以下事情:
KafkaController向该副本所在的broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
NewReplica -> OnlineReplica
KafkaController将该副本加入AR副本集
OnlineReplica/ OfflineReplica -> OnlineReplica
副本上线,OfflineReplica就会转成OnlineReplica状态
KafkaController向该副本所在的broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
NewReplica/OnlineReplica/OfflineReplica/ReplicaDeletionStart->
OfflineReplica
KafkaController向该副本所在的broker发送StopReplicaRequest;
然后会将ISR中移除该副本;
向其他可用副本所在broker发送LeaderAndIsrRequest;
向集群中其他可用broker发送UpdateMetadataCacheRequest
OfflineReplica -> ReplicaDeletionStart
KafkaController向该副本所在的broker发送StopReplicaRequest;
ReplicaDeletionStart ->ReplicaDeletionSuccessful
只是进行状态转换
ReplicaDeletionStart ->ReplicaDeletionIneligible
只是进行状态转换
ReplicaDeletionSuccessful -> NonExistentReplica
KafkaController从AR集合中删除该副本
二 核心字段
controllerContext: ControllerContext 用于维护KafkaController中上下文信息
replicaState:Map[PartitionAndReplica, ReplicaState] 用于保存每一个副本对应的状态
brokerChangeListener:BrokerChangeListener zookeeper的监听器,用于监听broker的变化,比如broker宕机或者重新上线
brokerRequestBatch:ControllerBrokerRequestBatch 用于向指定的Broker批量发送请请求
三 重要方法
3.1 startup 在ReplicaStateMachine启动时候,会初始化副本状态,并尝试将可用副本状态转化为在线状态
private def
initializeReplicaState() {
// 遍历所有的分区的副本集
for((topicPartition, assignedReplicas) <-
controllerContext.partitionReplicaAssignment) {
val topic
= topicPartition.topic
val partition
= topicPartition.partition
assignedReplicas.foreach {
replicaId =>
val partitionAndReplica
= PartitionAndReplica(topic,
partition, replicaId)
// 如果可用的broker包含该副本,则把他们都初始化为OnlineReplica
if (controllerContext.liveBrokerIds.contains(replicaId))
replicaState.put(partitionAndReplica,
OnlineReplica)
else
// 否则初始化为ReplicaDeletionIneligible状态
replicaState.put(partitionAndReplica,
ReplicaDeletionIneligible)
}
}
}
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState, callbacks: Callbacks = (new CallbackBuilder).build) { if(replicas.nonEmpty) { info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(","))) try { brokerRequestBatch.newBatch() // 遍历每一个副本,然后转换状态为OnlineReplica replicas.foreach(r => handleStateChange(r, targetState, callbacks)) // 向该副本发送请求 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) } } }
3.2 handleStateChange 处理副本状态转化,转换之前都会进行前置状态的校验
def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) { // 获取该副本的topic val topic = partitionAndReplica.topic // 获取该副本的分区 val partition = partitionAndReplica.partition // 获取该副本的id val replicaId = partitionAndReplica.replica val topicAndPartition = TopicAndPartition(topic, partition) if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + "to %s failed because replica state machine has not started") .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) // 获取副本当前状态 val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) try { // 获取AR副本集 val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) targetState match { // 如果要转换成新副本,比如刚创建topic的时候,onNewPartitionCreation则会进行状态转换 case NewReplica => // 检查前置状态 assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // 创建LeaderIsrAndControllerEpoch对象, val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => // NewReplica不可能是该Partition的leader,只有online状态才有leader if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") // 向该broker发送LeaderAndIsrRequest请求 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) case None => // new leader request will be sent to this replica when one gets elected } // 更新该副本状态 replicaState.put(partitionAndReplica, NewReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) // 如果要转换成ReplicaDeletionStarted case ReplicaDeletionStarted => // 验证前置状态失是不是OfflineReplica assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) // 更新该副本状态 replicaState.put(partitionAndReplica, ReplicaDeletionStarted) // 发送StopReplicaRequest的请求 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) // 如果要转换成ReplicaDeletionIneligible,表示删除失败 case ReplicaDeletionIneligible => // 验证前置状态失是不是ReplicaDeletionStarted assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) // 更新该副本状态 replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) // 如果要转换成ReplicaDeletionSuccessful,表示删除成功 case ReplicaDeletionSuccessful => // 验证前置状态失是不是ReplicaDeletionStarted assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) // 更新该副本状态 replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) // 如果要转换成NonExistentReplica,表示他已经不是副本了 case NonExistentReplica => // 验证前置状态失是不是ReplicaDeletionSuccessful assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) // 从AR副本集删除该副本 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) // 更新ControllerContext的AR副本集,因为需要从它移除该副本 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) // 从副本状态映射集合移除该副本 replicaState.remove(partitionAndReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) // 如果要转换成OnlineReplica case OnlineReplica => // 验证前置状态是不是NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) replicaState(partitionAndReplica) match { // 如果前置state是NewReplica case NewReplica => // 把这个副本添加到AR副本集 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if(!currentAssignedReplicas.contains(replicaId)) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case _ => // 检查分区leader是否存在 controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) // 更新副本状态 replicaState.put(partitionAndReplica, OnlineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case None => // that means the partition was never in OnlinePartition state, this means the broker never // started a log for that partition and does not have a high watermark value for this partition } } // 更新副本状态 replicaState.put(partitionAndReplica, OnlineReplica) // 如果要转换成OfflineReplica case OfflineReplica => // 验证前置状态 assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) // 发送StopReplicaRequest命令,以防止去leader去获取数据 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // controller从ISR列表删除已经挂掉的副本 val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => // 从ISR列表移除这个副本 controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderIsrAndControllerEpoch) => val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) } replicaState.put(partitionAndReplica, OfflineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) false case None => true } case None => true } if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) throw new StateChangeFailedException( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) } } catch { case t: Throwable => stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) } }
相关文章推荐
- Apache Kafka源码分析 - ReplicaStateMachine
- HierarchicalStateMachine(HandlerStateMachine)分析
- 安卓StateMachine分析举例---WifiStateMachine
- Swift StateMachine源码分析
- /drivers/net/phy/phy.c的状态机phy_state_machine分析
- (spring全家桶十)Spring Statemachine有限状态机与地址分析
- 分析WifiStateMachine如何启动wifi驱动
- PartitionStateMachine分析
- HierarchicalStateMachine(HandlerStateMachine)分析
- Apache Kafka源码分析 - PartitionStateMachine
- What is State machine?
- 【Unity】Finite State Machine 有限状态机
- unity, animator stateMachine立即触发状态转换
- 数据集 DataRow.RowState 属性深入分析
- When Cyber Security Meets Machine Learning 机器学习 安全分析 对于安全领域的总结很有用 看未来演进方向
- Activity-onSaveInstanceState()、onRestoreInstanceState() 分析
- React中的state和props分析
- Finite State Machine library Introduction
- EtherCAT State Machine Transitions