Apache Kafka源码分析 – Replica and Partition
2014-03-03 18:01
246 查看
Replica
对于local replica, 需要记录highWatermarkValue,表示当前已经committed的数据
对于remote replica,需要记录logEndOffsetValue以及更新的时间
对于local replica, 需要记录highWatermarkValue,表示当前已经committed的数据
对于remote replica,需要记录logEndOffsetValue以及更新的时间
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) // 获取OutofSync的replicas if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas – outOfSyncReplicas //从ISR中去除outOfSyncReplicas // update ISR in zk and in cache updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() } case None => // do nothing if no longer leader } } } def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { /** * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, * the follower is stuck and should be removed from the ISR * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the * follower is not catching up and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas }
相关文章推荐
- Apache Kafka源码分析 – ReplicaManager
- Apache Kafka源码分析 - ReplicaStateMachine
- Apache Kafka源码分析 – Log Management
- Apache Kafka源码分析 – Broker Server
- apache kafka系列之源码分析走读-kafkaApi详解
- Spark源码分析 – Deploy
- Spark Streaming源码分析 – DStream
- Spark源码分析 – SchedulerBackend
- Spark源码分析 – Dependency
- Spark源码分析 – DAGScheduler
- HDFS源码分析数据块之CorruptReplicasMap
- Spark源码分析 – SparkContext
- apache kafka系列之源码分析走读-kafka内部模块分析
- Spark Streaming源码分析 – InputDStream
- Spark源码分析 – Executor
- apache kafka系列之源码分析走读-SocketServer分析
- apache kafka系列之源码分析走读-kafka内部模块分析
- HDFS源码分析数据块汇报之损坏数据块检测checkReplicaCorrupt()
- Spark 源码分析 – BlockManagerMaster&Slave
- apache kafka源码分析-Producer分析---转载