您的位置:首页 > 其它

ReplicaStateMachine分析

2017-05-31 17:25 330 查看
ReplicaStateMachine是KafkaController维护副本的状态机,副本状态由ReplicaState表示,有七种不同的副本状态:



一 副本状态的转化



NonExistentReplica -> NewReplica

新创建topic或者重新副本的时候都有可能创建新的副本,这时后副本就转换成NewReplica,在转换到时候会干以下事情:

KafkaController向该副本所在的broker发送LeaderAndIsrRequest;

向集群中其他可用broker发送UpdateMetadataCacheRequest

 

NewReplica -> OnlineReplica

KafkaController将该副本加入AR副本集

 

OnlineReplica/ OfflineReplica -> OnlineReplica

副本上线,OfflineReplica就会转成OnlineReplica状态

KafkaController向该副本所在的broker发送LeaderAndIsrRequest;

向集群中其他可用broker发送UpdateMetadataCacheRequest

 

NewReplica/OnlineReplica/OfflineReplica/ReplicaDeletionStart->

OfflineReplica

KafkaController向该副本所在的broker发送StopReplicaRequest;

然后会将ISR中移除该副本;

向其他可用副本所在broker发送LeaderAndIsrRequest;

向集群中其他可用broker发送UpdateMetadataCacheRequest

 

OfflineReplica -> ReplicaDeletionStart

KafkaController向该副本所在的broker发送StopReplicaRequest;

 

ReplicaDeletionStart ->ReplicaDeletionSuccessful

只是进行状态转换

ReplicaDeletionStart ->ReplicaDeletionIneligible

只是进行状态转换

 

ReplicaDeletionSuccessful -> NonExistentReplica

KafkaController从AR集合中删除该副本

 

二 核心字段

controllerContext: ControllerContext 用于维护KafkaController中上下文信息

replicaState:Map[PartitionAndReplica, ReplicaState] 用于保存每一个副本对应的状态

brokerChangeListener:BrokerChangeListener zookeeper的监听器,用于监听broker的变化,比如broker宕机或者重新上线

brokerRequestBatch:ControllerBrokerRequestBatch 用于向指定的Broker批量发送请请求

 

三 重要方法

3.1 startup 在ReplicaStateMachine启动时候,会初始化副本状态,并尝试将可用副本状态转化为在线状态

private def
initializeReplicaState() {

  // 遍历所有的分区的副本集

  for((topicPartition, assignedReplicas) <-
controllerContext.partitionReplicaAssignment) {

    val topic
= topicPartition.topic

    val partition
= topicPartition.partition

    assignedReplicas.foreach {
replicaId =>

      val partitionAndReplica
= PartitionAndReplica(topic,
partition, replicaId)

      // 如果可用的broker包含该副本,则把他们都初始化为OnlineReplica

      if (controllerContext.liveBrokerIds.contains(replicaId))

        replicaState.put(partitionAndReplica,
OnlineReplica)

      else

        // 否则初始化为ReplicaDeletionIneligible状态

        replicaState.put(partitionAndReplica,
ReplicaDeletionIneligible)

    }

  }

}
 

def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
                       callbacks: Callbacks = (new CallbackBuilder).build) {
  if(replicas.nonEmpty) {
    info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
    try {
      brokerRequestBatch.newBatch()
      // 遍历每一个副本,然后转换状态为OnlineReplica
      replicas.foreach(r => handleStateChange(r, targetState, callbacks))
      // 向该副本发送请求
      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
    }catch {
      case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
    }
  }
}

 

3.2 handleStateChange 处理副本状态转化,转换之前都会进行前置状态的校验

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) {
  // 获取该副本的topic
  val topic = partitionAndReplica.topic
  // 获取该副本的分区
  val partition = partitionAndReplica.partition
  // 获取该副本的id
  val replicaId = partitionAndReplica.replica
  val topicAndPartition = TopicAndPartition(topic, partition)
  if (!hasStarted.get)
    throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
                                          "to %s failed because replica state machine has not started")
                                            .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
  // 获取副本当前状态
  val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
  try {
    // 获取AR副本集
    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
    targetState match {
      // 如果要转换成新副本,比如刚创建topic的时候,onNewPartitionCreation则会进行状态转换
      case NewReplica =>
        // 检查前置状态
        assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
        // 创建LeaderIsrAndControllerEpoch对象,
        val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
        leaderIsrAndControllerEpochOpt match {
          case Some(leaderIsrAndControllerEpoch) =>
            // NewReplica不可能是该Partition的leader,只有online状态才有leader
            if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
              throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
            // 向该broker发送LeaderAndIsrRequest请求
            brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment)
          case None => // new leader request will be sent to this replica when one gets elected
        }
        // 更新该副本状态
        replicaState.put(partitionAndReplica, NewReplica)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                          targetState))
      // 如果要转换成ReplicaDeletionStarted
      case ReplicaDeletionStarted =>
        // 验证前置状态失是不是OfflineReplica
        assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
        // 更新该副本状态
        replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
        // 发送StopReplicaRequest的请求
        brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      // 如果要转换成ReplicaDeletionIneligible,表示删除失败
      case ReplicaDeletionIneligible =>
        // 验证前置状态失是不是ReplicaDeletionStarted
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
        // 更新该副本状态
        replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      // 如果要转换成ReplicaDeletionSuccessful,表示删除成功
      case ReplicaDeletionSuccessful =>
        // 验证前置状态失是不是ReplicaDeletionStarted
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
        // 更新该副本状态
        replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      // 如果要转换成NonExistentReplica,表示他已经不是副本了
      case NonExistentReplica =>
        // 验证前置状态失是不是ReplicaDeletionSuccessful
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
        // 从AR副本集删除该副本
        val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
        // 更新ControllerContext的AR副本集,因为需要从它移除该副本
        controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
        // 从副本状态映射集合移除该副本
        replicaState.remove(partitionAndReplica)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      // 如果要转换成OnlineReplica
      case OnlineReplica =>
        // 验证前置状态是不是NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible
        assertValidPreviousStates(partitionAndReplica,
          List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
        replicaState(partitionAndReplica) match {
          // 如果前置state是NewReplica
          case NewReplica =>
            // 把这个副本添加到AR副本集
            val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
            if(!currentAssignedReplicas.contains(replicaId))
              controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
            stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                      .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                              targetState))
          case _ =>
            // 检查分区leader是否存在
            controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
              case Some(leaderIsrAndControllerEpoch) =>
                brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                  replicaAssignment)
                // 更新副本状态
                replicaState.put(partitionAndReplica, OnlineReplica)
                stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
              case None => // that means the partition was never in OnlinePartition state, this means the broker never
                // started a log for that partition and does not have a high watermark value for this partition
            }
        }
        // 更新副本状态
        replicaState.put(partitionAndReplica, OnlineReplica)
      // 如果要转换成OfflineReplica
      case OfflineReplica =>
        // 验证前置状态
        assertValidPreviousStates(partitionAndReplica,
          List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
        // 发送StopReplicaRequest命令,以防止去leader去获取数据
        brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
        // controller从ISR列表删除已经挂掉的副本
        val leaderAndIsrIsEmpty: Boolean =
          controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
            case Some(currLeaderIsrAndControllerEpoch) =>
              // 从ISR列表移除这个副本
              controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                case Some(updatedLeaderIsrAndControllerEpoch) =>
                  val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
                  if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
                      topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                  }
                  replicaState.put(partitionAndReplica, OfflineReplica)
                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                  false
                case None =>
                  true
              }
            case None =>
              true
          }
        if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
          throw new StateChangeFailedException(
            "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
            .format(replicaId, topicAndPartition))
    }
  }
  catch {
    case t: Throwable =>
      stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
                                .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
  }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: