您的位置:首页 > 运维架构 > Apache

Apache Kafka源码分析 – Replica and Partition

2014-03-03 18:01 246 查看
Replica

对于local replica, 需要记录highWatermarkValue,表示当前已经committed的数据
对于remote replica,需要记录logEndOffsetValue以及更新的时间


def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) // 获取OutofSync的replicas
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas – outOfSyncReplicas //从ISR中去除outOfSyncReplicas
// update ISR in zk and in cache
updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
}
case None => // do nothing if no longer leader
}
}
}

def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
/**
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms,
*                     the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
*                     follower is not catching up and should be removed from the ISR
**/
val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: