您的位置:首页 > 其它

kafka源码分析之kafka的consumer的负载均衡管理

2016-07-27 10:35 441 查看

GroupCoordinator

说明,主要是消费者的连接建立,offset的更新操作。管理所有的consumer与对应的group的信息。Group的metadata的信息,consumer对应的offset的更新操作。

实例创建与启动

consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)

consumerCoordinator.startup()

创建实例,
def create(config: KafkaConfig,

zkUtils: ZkUtils,

replicaManager: ReplicaManager): GroupCoordinator = {
读取与记录group的offset相关的配置信息:
1,配置项offset.metadata.max.bytes,默认值4096.用于配置offset的请求的最大请求的消息大小。
2,配置项offsets.load.buffer.size,默认值5MB,用于在读取offset信息到内存cache时,用于读取缓冲区的大小。
3,配置项offsets.retention.minutes,默认值24小时,针对一个offset的消费记录的最长保留时间。
4,配置项offsets.retention.check.interval.ms,默认值600秒,用于定期检查offset过期数据的检查周期。
5,配置项offsets.topic.num.partitions,默认值50,offset记录的topic的partition个数。
6,配置项offsets.topic.replication.factor,默认3,用于配置offset记录的topic的partition的副本个数。
7,配置项offsets.commit.timeout.ms,默认值5秒,用于配置提交offset的最长等待时间。
8,配置项offsets.commit.required.acks,默认值-1,用于配置提交offset的请求的ack的值。
9,配置项group.min.session.timeout.ms,默认值6秒,
10,配置项group.max.session.timeout.ms,默认值30秒,用于配置session的超时时间。

val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,

loadBufferSize = config.offsetsLoadBufferSize,

offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,

offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,

offsetsTopicNumPartitions = config.offsetsTopicPartitions,

offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,

offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,

offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)

val groupConfig = GroupConfig(
groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,

groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)

new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager,
zkUtils)

}

更新此topic对应的配置文件,主要修改日志清理部分的配置。
修改这个topic的segment的大小为100MB每一个。默认的非内置的topic的segment的大小为1GB.
def offsetsTopicConfigs: Properties = {

val props = new Properties

props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)

props.put(LogConfig.SegmentBytesProp,
offsetConfig.offsetsTopicSegmentBytes.toString)

props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)

props

}

生成GroupCoordinator中用于对offset进行操作的组件,GroupMetadataManager实例。
---------------------------
用于存储每个group消费的partition对应的offset

private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]

用于存储当前所有的消费者的信息,每个消费者中包含有多少个client进行消费等

private val groupsCache = new Pool[String, GroupMetadata]

如果正在对topic中的内容进行加载时,还没有加载到cache中,这个集合中存储有每个group与partition的名称。

private val loadingPartitions: mutable.Set[Int] = mutable.Set()

这个集合中存储有当前所有的group中已经cache到内存的partition的消费者信息,表示这个group的offse可以被读取。

private val ownedPartitions: mutable.Set[Int] = mutable.Set()

从zk中对应的这个记录消费者信息的topic中读取这个topic的partition信息与副本信息。

/* number of partitions for the consumer metadata topic */

private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount

/* Single-thread scheduler to handling offset/group metadata cache loading and unloading */

private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
根据定时检查offset过期的时间周期,执行过期offset删除的操作,deleteExpiredOffsets函数。

scheduler.startup()

scheduler.schedule(name = "delete-expired-consumer-offsets",

fun = deleteExpiredOffsets,

period = config.offsetsRetentionCheckIntervalMs,

unit = TimeUnit.MILLISECONDS)

启动GroupCoordinator实例时,生成的相关信息:

def startup() {

info("Starting up.")

定义用于处理client与group心跳超时的控制单元。

heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]
("Heartbeat", brokerId)
定义用于处理group加入的超时控制单元。

joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]
("Rebalance", brokerId)

设置当前的coordinator的实例为活动状态。

isActive.set(true)

info("Startup complete.")

}

Group元数据partition的leader上线操作

这个操作在对应元数据管理的partition的leader发生变化后,被选择成为新的leader的节点上会进行触发,或者一个broker启动时,也会触发这个动作。

这个onGroupLoaded函数用于处理在group的加载后执行的动作,这个回调函数主要完成对当前的所有的member进行心跳超时的监听动作,生成一个DelayedHeartbeat实例用于监听对member的心跳超时。
private def onGroupLoaded(group: GroupMetadata) {

group synchronized {

info(s"Loading group metadata for ${group.groupId} with generation
${group.generationId}")

assert(group.is(Stable))

group.allMemberMetadata.foreach(
completeAndScheduleNextHeartbeatExpiration(group, _)
)

}

}

当group的消费的topic的partition在当前的broker中被选举成leader时,触发的函数。

def handleGroupImmigration(offsetTopicPartitionId: Int) {
这里直接通过groupManager中的loadGroupsForPartition对partition进行加载。

groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)

}

接下来看看这个loadGroupsForPartition函数的处理流程:

/**

* Asynchronously read the partition from the offsets topic and populate the cache

*/

def loadGroupsForPartition(offsetsPartition: Int,

onGroupLoaded: GroupMetadata => Unit) {

val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
执行这个loadGroupsForPartition函数内的内部函数loadGroupsAndOffsets函数,来对这个partition的数据进行加载。

scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)

接下来看看这个加载partition数据的函数的处理逻辑:

def loadGroupsAndOffsets() {

info("Loading offsets and group metadata from " + topicPartition)

首先,如果要加载的partition已经在loadingPartitions集合中存在了,表示这个partition已经在执行加载操作,直接return回去,不进行处理,否则把这个partition加入到loadingPartitions中,这个表示是正在执行加载操作的partition的集合。
这里的offsetsPartition表示的是存储元数据与offset的内置topic的partition.

loadingPartitions synchronized {

if (loadingPartitions.contains(offsetsPartition)) {

info("Offset load from %s already in progress.".format(topicPartition))

return

} else {

loadingPartitions.add(offsetsPartition)

}

}

val startMs = SystemTime.milliseconds

try {
从LogManager中得到这个partition对应的Log实例,

replicaManager.logManager.getLog(topicPartition) match {

case Some(log) =>
如果在当前的机器上有这个partition的副本,那么这个Log实例就一定存在,得到这个Log中最小的segment的最小的offset.

var currOffset = log.logSegments.head.baseOffset
根据每次加载的数据量,生成一个加载数据的buffer.

val buffer = ByteBuffer.allocate(config.loadBufferSize)

inWriteLock(offsetExpireLock) {

val loadedGroups = mutable.Map[String, GroupMetadata]()

val removedGroups = mutable.Set[String]()

开始进行迭代读取这个partition的log中的消息,直到读取到offset等于当前partition的最大的offset为迭代结束 。这里加载到的highWatermark的offset是当前副本同步到的最新的大小。
这个highWatermark根据对应的partition的follower的副本的同步,每次同步会更新这个副本的logEndOffset的值,而这个highWatermark的值是所有的副本中logEndOffset最小的一个值。

while (currOffset < getHighWatermark(offsetsPartition)
&& !shuttingDown.get()) {

buffer.clear()
读取指定大小的数据,并把消息存储到生成的buffer中。

val messages = log.read(currOffset,
config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]

messages.readInto(buffer, 0)
根据这个buffer生成用于消息读取的message的集合。

val messageSet = new ByteBufferMessageSet(buffer)

根据读取出来的消息集合进行迭代,处理每一条读取到的消息。这里调用的是messageSet的iterator的函数。

messageSet.foreach { msgAndOffset =>

require(msgAndOffset.message.key != null,
"Offset entry key should not be null")

解析出这一条消息的key值,并根据key值的类型做对应的处理流程。

val baseKey = GroupMetadataManager.readMessageKey(
msgAndOffset.message.key)

如果读取到的消息是一个consumer记录的offset的消费信息的记录,

if (baseKey.isInstanceOf[OffsetKey]) {

// load offset

val key = baseKey.key.asInstanceOf[GroupTopicPartition]
这里检查下读取到的offset的metadata的记录的value部分是否为null,如果为null,表示这条offset已经过期被清理掉,从offsetsCache中移出这条offset的记录。

if (msgAndOffset.message.payload == null) {

if (offsetsCache.remove(key) != null)

trace("Removed offset for %s due to tombstone entry.".format(key))

else

trace("Ignoring redundant tombstone for %s.".format(key))

}
下面的else部分表示offset读取到的消息是一条正常的消息,把这条存储consumer offset记录的消息写入到offsetsCache集合中,如果这条offset的commit时,指定了过期时间时,那么这个消息直接使用这个过期时间,否则使用这个offset commit时的时间加上配置的过期延时来设置这个offset的过期时间。
else {

val value = GroupMetadataManager.readOffsetMessageValue(
msgAndOffset.message.payload)

putOffset(key, value.copy (

expireTimestamp = {

if (value.expireTimestamp == org.apache.kafka.common
.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)

value.commitTimestamp + config.offsetsRetentionMs

else

value.expireTimestamp

}

))

trace("Loaded offset %s for %s.".format(value, key))

}

}
如果读取到的消息是一个对group的sync的操作后存储的group的元数据的消息
else {

// load group metadata

val groupId = baseKey.key.asInstanceOf[String]
这个部分表示是一条存储group的metadata的消息,检查这个group的value是否为null,为null表示这个group已经被删除,把这个group添加到removedGroups集合中,否则把这个group与对应的group的metadata信息添加到loadedGroups集合中。

val groupMetadata = GroupMetadataManager.readGroupMessageValue(
groupId, msgAndOffset.message.payload)

if (groupMetadata != null) {

trace(s"Loaded group metadata for group
${groupMetadata.groupId} with generation
${groupMetadata.generationId}")

removedGroups.remove(groupId)

loadedGroups.put(groupId, groupMetadata)

} else {

loadedGroups.remove(groupId)

removedGroups.add(groupId)

}

}

currOffset = msgAndOffset.nextOffset

}

}

如果读取完成这个partition的所有的日志后,有需要添加到cache的group的元数据信息时,迭代这个集合,取出每一个group的元数据并添加到groupsCache的cache集合中。同时设置每一个group的所有的member的心跳超时监听。

loadedGroups.values.foreach { group =>

val currentGroup = addGroup(group)

if (group != currentGroup)

debug(s"Attempt to load group
${group.groupId} from log with generation
${group.generationId} failed " +

s"because there is already a cached group with generation
${currentGroup.generationId}")

else

onGroupLoaded(group)

}

如果读取完成对这个partition的所有的日志后,有已经被删除掉的group时,迭代这个已经删除的group的集合,检查每一个已经删除的group是否还在groupsCache的cache,

removedGroups.foreach { groupId =>

val group = groupsCache.get(groupId)

if (group != null)

throw new IllegalStateException(s"Unexpected unload of acitve group
${group.groupId} while " +

s"loading partition ${topicPartition}")

}

}

if (!shuttingDown.get())

info("Finished loading offsets from %s in %d milliseconds."

.format(topicPartition, SystemTime.milliseconds - startMs))

case None =>

warn("No log found for " + topicPartition)

}

}

catch {

case t: Throwable =>

error("Error in loading offsets from " + topicPartition, t)

}

finally {
当对用于存储group的元数据与consumer对partition的消费记录的offset的加载完成后,把这个partition添加到ownedPartitions集合中表示完成partition的数据加载,
并从loadingPartitions集合中移出,loadingPartitions中如果存在partition表示这个parition中存储的group信息暂时是无法被访问的。

loadingPartitions synchronized {

ownedPartitions.add(offsetsPartition)

loadingPartitions.remove(offsetsPartition)

}

}

}

}

Group元数据partition的leader下线操作

当一个group对应的元数据的partition的leader对应的broker节点下线,或者leader发生切换时,对原来的leader的partition需要执行下线的操作。

下线操作后的回调函数:

在下线一个group对应的partition的leader时,会得到这个partition中所有的group,并根据这个group的元数据调用当前的回调函数。

处理流程:

1,更新这个group的状态为Dead的状态,表示group在当前的节点已经死亡。

2,根据group更新状态前原来的状态,进行回调处理:

2,1,如果group下线前的状态是PreparingRebalance,调用这个group中所有的consumer的joinCallback的回调函数,向对应的consumer写入一个NOT_COORDINATOR_FOR_GROUP错误。

2,2,如果group下线前的状态是Stable | AwaitingSync,调用这个group中所有的consumer的syncCallback的回调函数,向对应的consumer写入一个NOT_COORDINATOR_FOR_GROUP错误。

private def onGroupUnloaded(group: GroupMetadata) {

group synchronized {

info(s"Unloading group metadata for ${group.groupId}

with generation ${group.generationId}")

val previousState = group.currentState

group.transitionTo(Dead)

previousState match {

case Dead =>

case PreparingRebalance =>

for (member <- group.allMemberMetadata) {

if (member.awaitingJoinCallback != null) {

member.awaitingJoinCallback(joinError(member.memberId,
Errors.NOT_COORDINATOR_FOR_GROUP.code))

member.awaitingJoinCallback = null

}

}

joinPurgatory.checkAndComplete(GroupKey(group.groupId))

case Stable | AwaitingSync =>

for (member <- group.allMemberMetadata) {

if (member.awaitingSyncCallback != null) {

member.awaitingSyncCallback(Array.empty[Byte],
Errors.NOT_COORDINATOR_FOR_GROUP.code)

member.awaitingSyncCallback = null

}

heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId,
member.memberId))

}

}

}

}

下线leader时的入口函数:这个函数直接调用了groupManager中的removeGroupsForPartition函数。

def handleGroupEmigration(offsetTopicPartitionId: Int) {

groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)

}

接下来看看GroupMetadataManager中的removeGroupsForPartition函数流程:

这个函数中根据要执行下线操作的partition的下标与对应用于group下线的回调处理函数进行处理。

*/

def removeGroupsForPartition(offsetsPartition: Int,

onGroupUnloaded: GroupMetadata => Unit) {

首先得到要下线的partition的TopicPartition的实例。

val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
发起对下线group与offset的元数据的下线处理,这里直接调用当前的内部函数removeGroupsAndOffsets的函数。

scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)

下面定义了具体下线操作的操作流程的函数removeGroupsAndOffsets。

def removeGroupsAndOffsets() {

var numOffsetsRemoved = 0

var numGroupsRemoved = 0

loadingPartitions synchronized {
首先从ownedPartitions集合中移出这个partition.表示这个partition已经下线,所有的对应这个partition的group来请求当前的节点时,都将无法访问。

// we need to guard the group removal in cache in the loading partition lock

// to prevent coordinator's check-and-get-group race condition

ownedPartitions.remove(offsetsPartition)

迭代offsetsCache的cache集合,找到这个offset的元数据中group对应的partition与下线的partition是相同的partition的所有的记录并从cache中移出这些记录,并记录移出的offset的条数(用于打印日志)。

offsetsCache.keys.foreach { key =>

if (partitionFor(key.group) == offsetsPartition) {

offsetsCache.remove(key)

numOffsetsRemoved += 1

}

}

从groupsCache的cache的集合中找到group元数据对应的存储partition与下线的partition相同的所有的group的元数据记录,从cache中移出这些个group并执行下线group的回调函数,记录下线group的元数据的个数(用于打印日志)。

// clear the groups for this partition in the cache

for (group <- groupsCache.values) {

if (partitionFor(group.groupId) == offsetsPartition) {

onGroupUnloaded(group)

groupsCache.remove(group.groupId, group)

numGroupsRemoved += 1

}

}

}

if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower
transition."

.format(numOffsetsRemoved, TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))

if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower
transition."

.format(numGroupsRemoved, TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))

}

}

定期删除过期的offset记录

根据配置项offsets.retention.check.interval.ms,默认值600秒,用于定期检查offset过期数据的检查周期,通过GroupMetadataManager中的deleteExpiredOffsets函数来进行处理。

private def deleteExpiredOffsets() {

debug("Collecting expired offsets.")

val startMs = SystemTime.milliseconds

val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
这里得到过期时间小于当前的时间的所有的offset的cache的集合。

val expiredOffsets = offsetsCache.filter {
case (groupTopicPartition, offsetAndMetadata) =>

offsetAndMetadata.expireTimestamp < startMs

}

debug("Found %d expired offsets.".format(expiredOffsets.size))

把过期的offset cache的集合从offsetsCache集合中移出,同时生成一个新的消息集合,这个消息的value都是null值,表示这是一个可以被删除的点,并按partition的id值进行分组。

val tombstonesForPartition = expiredOffsets.map {
case (groupTopicAndPartition, offsetAndMetadata) =>
这里找到这个groupid对应的存储offset的partition的partition的id值。

val offsetsPartition = partitionFor(groupTopicAndPartition.group)

trace("Removing expired offset and metadata for %s: %s"
.format(groupTopicAndPartition, offsetAndMetadata))

offsetsCache.remove(groupTopicAndPartition)

val commitKey = GroupMetadataManager.offsetCommitKey(
groupTopicAndPartition.group,

groupTopicAndPartition.topicPartition.topic,
groupTopicAndPartition.topicPartition.partition)

(offsetsPartition, new Message(bytes = null, key = commitKey))

}.groupBy { case (partition, tombstone) => partition }

tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
从副本的allPartitions集合中找到对应group的partition的实例。

val partitionOpt = replicaManager.getPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)

如果这个partition存在,生成向这个partition写入的消息,并向topic中写入这个offset记录的消息,这个消息的offset部分的存储是一个null值。

partitionOpt.map { partition =>

val appendPartition = TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)

val messages = tombstones.map(_._2).toSeq

trace("Marked %d offsets in %s for deletion."
.format(messages.size, appendPartition))

try {

// do not need to require acks since even if the tombsone is lost,

// it will be appended again in the next purge cycle

partition.appendMessagesToLeader(
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages: _*))

tombstones.size

}

catch {

case t: Throwable =>

error("Failed to mark %d expired offsets for deletion in %s."
.format(messages.size, appendPartition), t)

// ignore and continue

0

}

}

}.sum

}

info("Removed %d expired offsets in %d milliseconds."
.format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs))

}

处理group的加入

接收并处理请求

当一个consumer的实例生成时,第一次执行消费操作时,会向group对应的broker发起一个joinGroup的请求,这个请求由GroupCoordinator实例中的如下函数来进行处理。

def handleJoinGroup(groupId: String,

memberId: String,

clientId: String,

clientHost: String,

sessionTimeoutMs: Int,

protocolType: String,

protocols: List[(String, Array[Byte])],

responseCallback: JoinCallback) {

首先判断coordinator实例是否被启动,如果没有启动,
直接向client端响应GROUP_COORDINATOR_NOT_AVAILABLE消息。

if (!isActive.get) {

responseCallback(joinError(memberId,
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))

} else if (!validGroupId(groupId)) {
这里检查groupId是否为空字符串或者是一个null值,如果是,直接向client端响应INVALID_GROUP_ID代码。

responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))

} else if (!isCoordinatorForGroup(groupId)) {
这里检查groupId对应的topic存储的partition是否在当前的ownerPartitions的集合中,如果不在,表示这个joinGroup的请求对应的leader发生了变化 ,需要重新连接,直接向client端响应NOT_COORDINATOR_FOR_GROUP代码。

responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))

} else if (isCoordinatorLoadingInProgress(groupId)) {
这里检查groupId对应的partition是否还处于加载的状态(loadingPartitions集合中存在),如果这个partition还没有加载完成,直接向client端响应GROUP_LOAD_IN_PROGRESS代码。

responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))

} else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||

sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
如果超时时间不在配置的范围内时,直接向client端响应INVALID_SESSION_TIMEOUT代码。

responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))

} else {
如果流程执行到这个操作时,表示开始进行group的join处理。
首先先通过groupManager从groups集合中得到对应的groupId的定义信息,如果这个信息不存在,表示这个group是第一次加入,否则表示这是一个已经存在的group

// only try to create the group if the group is not unknown AND

// the member id is UNKNOWN, if member is specified but group does not

// exist we should reject the request

var group = groupManager.getGroup(groupId)

if (group == null) {

if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
如果要进行的join的groupId是一个新加入的group,也就是还没有这个groupId对应的元数据时,传入的memberId的值必须是一个UNKNOWN_MEMBER_ID值,否则直接向client端响应UNKNOWN_MEMBER_ID代码。

responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

} else {
这种情况是groupId第一次加入,先生成一个GroupMetadata信息,一个GroupMetadata包含有这个groupId的名称与group对应的协议类型(consumer)

group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))

执行doJoinGroup操作,来添加这个group.

doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols, responseCallback)

}

} else {
这种情况下,groupId是一个已经存在的group信息,把这个groupId对应的元数据直接传入到doJoinGroup函数中进行处理。

doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols, responseCallback)

}

}

}

接下来看看doJoinGroup函数的流程:

private def doJoinGroup(group: GroupMetadata,

memberId: String,

clientId: String,

clientHost: String,

sessionTimeoutMs: Int,

protocolType: String,

protocols: List[(String, Array[Byte])],

responseCallback: JoinCallback) {

group synchronized {

if (group.protocolType != protocolType ||
!group.supportsProtocols(protocols.map(_._1).toSet)) {
如果group是一个已经存在的group时,这个检查用于检查上一次group的协议于当前consumer传入此group的join的协议是否相同,如果不相同,直接向client端响应INCONSISTENT_GROUP_PROTOCOL代码。

// if the new member does not support the group protocol, reject it

responseCallback(joinError(memberId,
Errors.INCONSISTENT_GROUP_PROTOCOL.code))

}
这里检查下memberId的值是否为UNKNOWN_MEMBER_ID,如果不是时,同时这个memberId不在group中存在时,直接向client端响应UNKNOWN_MEMBER_ID代码。这个处理主要也是用在重新请求时(leader切换后的重新请求)。
else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
&& !group.has(memberId)) {

responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

} else {

如果流程执行到这里时,开始根据当前group的状态执行相应的处理。

group.currentState match {

case Dead =>
如果group已经是一个Dead的状态的group表示这个group已经死亡,直接返回UNKNOWN_MEMBER_ID代码。

responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

case PreparingRebalance =>
如果当前的group的状态是正在准备执行Rebalance操作,
根据memberId是否是UNKNOWN_MEMBER_ID值执行新的member加入到group的操作或者执行对group中存在的member的更新操作。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {

addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)

} else {

val member = group.get(memberId)

updateMemberAndRebalance(group, member, protocols, responseCallback)

}

case AwaitingSync =>
如果当前的group的状态是一个AwaitingSync 表示是已经在执行rebalance的操作,这个时候对于新加入的member时,会先把原有的member全部失效。
根据memberId是否是UNKNOWN_MEMBER_ID值执行新的member加入到group的操作,或者说当前的member是一个重复的请求,也就是说他们的通信协议都相同,直接根据当前请求的member是否是leader来返回JoinGroupResult,最后一种情况,这种情况表示请求的member的协议发生变化,执行对对group中存在的member的更新操作。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {

addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)

} else {

val member = group.get(memberId)

if (member.matches(protocols)) {

responseCallback(JoinGroupResult(

members = if (memberId == group.leaderId) {

group.currentMemberMetadata

} else {

Map.empty

},

memberId = memberId,

generationId = group.generationId,

subProtocol = group.protocol,

leaderId = group.leaderId,

errorCode = Errors.NONE.code))

} else {

// member has changed metadata, so force a rebalance

updateMemberAndRebalance(group, member, protocols, responseCallback)

}

}

如果group的状态是Stable的状态时,表示这是一个新加入的Group,

case Stable =>

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
如果memberId是UNKNOWN_MEMBER_ID值表示consumer是一个新生成的consumer,执行添加成员并进行balance的操作。

// if the member id is unknown, register the member to the group

addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)

} else {
这种情况表示是一个成员被重新加入到group中来,

val member = group.get(memberId)

if (memberId == group.leaderId || !member.matches(protocols)) {

updateMemberAndRebalance(group, member, protocols, responseCallback)

} else {

responseCallback(JoinGroupResult(

members = Map.empty,

memberId = memberId,

generationId = group.generationId,

subProtocol = group.protocol,

leaderId = group.leaderId,

errorCode = Errors.NONE.code))

}

}

}

如果流程执行到这里后,目前group的状态还是PreparingRebalance状态时,通过这个joinGroup操作的DelayedJoin的操作实例,检查是否是completed的状态,如果不是时,执行tryComplete函数,如果这个函数执行成功时,会从对应的Watchers中移出这个DelayedJoin的操作。

if (group.is(PreparingRebalance))

joinPurgatory.checkAndComplete(GroupKey(group.groupId))

}

}

}

处理新的member加入group

当一个consumer执行joinGroup操作时,如果这个consumer对应的groupId的在metadata中不存在时(或者说是一个已经存在的group,但是来的是一个新加入的memberId的成员时),同时这个memberId是UNKNOWN_MEMBER_ID值时,表示这是一个新加入的groupId,这时会生成一个GroupMetadata的实例,这个实例的状态是Stable。
在处理这个操作时,通过GroupCoordinator中的addMemberAndRebalance函数来进行处理。
private def addMemberAndRebalance(sessionTimeoutMs: Int,

clientId: String,

clientHost: String,

protocols: List[(String, Array[Byte])],

group: GroupMetadata,

callback: JoinCallback) = {

根据当前请求的clientId的值加上一个UUID的值,生成一个memberId值,并生成MemberMetadata的信息,这个信息中包含有clientId与client对应的host,client的partition的名称。

// use the client-id with a random id suffix as the member-id

val memberId = clientId + "-" + group.generateMemberIdSuffix

val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost,
sessionTimeoutMs, protocols)

member.awaitingJoinCallback = callback

把这个新生成的member添加到group中。在向groupMetadata中添加成员时,如果group中的leader成员不存在时,把当前添加的member的id当成是这个group的leaderId.

group.add(member.memberId, member)

对这个group触发partition的rebalance操作。如果要执行rebalance操作,group的状态必须是Stable或者AwaitingSync状态,新添加的group的状态默认是Stable.
maybePrepareRebalance函数判断状态是否在指定的状态集合后,会执行prepareRebalance函数,这里的处理见下面的对member的加入与更新后group的rebalance操作。

maybePrepareRebalance(group)

member

}

处理对group中存在的member的更新

这个在consumer执行joinGroup操作时,这个consumer对应的groupId的在metadata中必须存在,同时这个memberId是一个已经存在的值,表示这个member在group中已经存在,这个groupId必须是一个已经存在的groupId。在执行joinGroup操作时,通过GroupCoordinator中的updateMemberAndRebalance函数来进行处理。
private def updateMemberAndRebalance(group: GroupMetadata,

member: MemberMetadata,

protocols: List[(String, Array[Byte])],

callback: JoinCallback) {
这里跟新添加一个member到group中处理不同的是直接修改这个已经存在的member的protocols与callback的函数。

member.supportedProtocols = protocols

member.awaitingJoinCallback = callback
检查并发起准备rebalance的操作。

maybePrepareRebalance(group)

}

private def maybePrepareRebalance(group: GroupMetadata) {

group synchronized {
对这个group触发partition的rebalance操作。如果要执行rebalance操作,group的状态必须是Stable或者AwaitingSync状态,新添加的group的状态默认是Stable.
这里的处理见下面的对member的加入与更新后group的rebalance操作。

if (group.canRebalance)

prepareRebalance(group)

}

}

group的rebalance操作

这个操作在一个新的group被生成后加入了一个member,在group中新加入了一个member,对一个已经存在的member进行了更新操作时,这个操作会被触发。

接下来看看触发rebalance操作的函数:
private def prepareRebalance(group: GroupMetadata) {
如果当前的group的状态是AwaitingSync状态时,表示当前的group已经在执行rebalance的操作,这里取消所有的member的成员的分配的partition,并向client端回写REBALANCE_IN_PROGRESS代码。

// if any members are awaiting sync, cancel their request and have them rejoin

if (group.is(AwaitingSync))

resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)

更新group的状态为PreparingRebalance状态,表示准备执行rebalance的操作。

group.transitionTo(PreparingRebalance)

info("Preparing to restabilize group %s with old generation %s"
.format(group.groupId, group.generationId))

开始监听join的加入。

val rebalanceTimeout = group.rebalanceTimeout

val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)

val groupKey = GroupKey(group.groupId)

joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))

}

执行join加入的监听:
首先调用joinPurgatory实例中的tryCompleteElseWatch函数
1,先执行如下的步骤:
var isCompletedByMe = operation synchronized operation.tryComplete()

if (isCompletedByMe)

return true
这里调用了DelayedJoin中的tryComplete的函数,
coordinator.tryCompleteJoin(group, forceComplete)
1,1这里直接通过调用GroupCoordinator中的tryCompleteJoin函数:
在这个函数中直接判断group的member的集合中是否有callback回调函数是null的集合,如果有,表示有需要进行reJoin的member的集合,直接返回false,否则调用forceComplete函数,并得到返回值,。
group synchronized {

if (group.notYetRejoinedMembers.isEmpty)

forceComplete()

else false

}
1,2在group不没有member的成员的callback的回调函数是null的情况下,先看看forceComplete的函数定义:
如果当前的completed的值是false,设置成true成功的话,执行onComplete的函数,并返回true,
否则的话,表示当前的completed的值已经被修改成了true,这里这个函数返回false.
if (completed.compareAndSet(false, true)) {

// cancel the timeout timer

cancel()

onComplete()

true

} else {

false

}
1,3在completed被成功从false设置为true后,onComplete函数实际上调用GroupCoordinator中的onCompleteJoin函数。

def onCompleteJoin(group: GroupMetadata) {

group synchronized {
这里首先得到需要进行rejoin操作的成员的集合(callback的回调函数是空的集合)

val failedMembers = group.notYetRejoinedMembers
如果说当前的group中没有成员或者说callback是null的成员集合有值时,
先从group中移出掉这个没有callback的成员。
移出完成没有callback的成员集合后,如果group现在的成员集合是个空集合,设置group的状态为Dead,
同时从groupManager中移出这个group.

if (group.isEmpty || !failedMembers.isEmpty) {

failedMembers.foreach { failedMember =>

group.remove(failedMember.memberId)

// TODO: cut the socket connection to the client

}

// TODO KAFKA-2720: only remove group in the background thread

if
(group.isEmpty) {

group.transitionTo(Dead)

groupManager.removeGroup(group)

info("Group %s generation %s is dead and removed"
.format(group.groupId, group.generationId))

}

}

if (!group.is(Dead)) {
如果group的状态不是Dead的状态,对group的generation的值进行初始化并递加。
并设置当前的group的状态为AwaitingSync状态。

group.initNextGeneration()

info("Stabilized group %s generation %s".format(group.groupId,
group.generationId))

迭代这个group中的所有的成员,
如果成员的memberId的值与group.leaderId的值相同,那么向这个成员的client端响应这个当前的group的所有的成员的集合,否则只响应这个member的memberId与generationId,与leader对应的memberId的值。

for (member <- group.allMemberMetadata) {

assert(member.awaitingJoinCallback != null)

val joinResult = JoinGroupResult(

members=if (member.memberId == group.leaderId)
{ group.currentMemberMetadata } else { Map.empty },

memberId=member.memberId,

generationId=group.generationId,

subProtocol=group.protocol,

leaderId=group.leaderId,

errorCode=Errors.NONE.code)

member.awaitingJoinCallback(joinResult)

member.awaitingJoinCallback = null

completeAndScheduleNextHeartbeatExpiration(group, member)

}

}

}

}

2,回到joinPurgatory实例中的tryCompleteElseWatch函数,开始根据1步中的返回结果进行处理:
如果返回的值是true,表示group中没有failedMembers的成员,同时completed属性把值从false设置成了true,这里返回的就是true.
if (isCompletedByMe)

return true


2,1流程执行到这里,表示当前的group中包含有failedMembers的成员,或者completed的值现在已经是true了没有更新成功,

var
watchCreated = false

for
(key <- watchKeys) {
如果当前的completed的值是true,直接返回。

if (operation.isCompleted())

return false

把这个key与operation添加到监听器中,这个情况只有completed的值是false,这种情况是一开始进入到join操作的时候,就发现group中包含有failedMembers的成员watchForOperation(key, operation)

if (!watchCreated) {
由于流程没有return,这里设置watchCreated的值为true,因为已经把operation加入到了watchs中。

watchCreated = true

estimatedTotalOperations.incrementAndGet()

}

}

3,这里再执行一次coordinator.tryCompleteJoin(group, forceComplete)函数,如果函数返回为true时,表示这个这个处理完成,直接返回true,
isCompletedByMe = operation synchronized operation.tryComplete()

if (isCompletedByMe)

return true


3,1,这种情况检查completed的值是否是true,如果不是,把这个operation添加到timer中,timer中会在超时时执行coordinator.onCompleteJoin(group)

if (! operation.isCompleted()) {

timeoutTimer.add(operation)

if (operation.isCompleted()) {

// cancel the timer task

operation.cancel()

}

}

Consumer中分配partition后的group同步

在对同一个group进行消费的多个consumer中,consumer在执行完成joinGroup操作后,会通过这个部分的handleSyncGroup函数来进行处理,

这个函数中传入的groupAssignment参数如果是follower的consumer时传入为空的集合。

def handleSyncGroup(groupId: String,

generation: Int,

memberId: String,

groupAssignment: Map[String, Array[Byte]],

responseCallback: SyncCallback) {

if (!isActive.get) {

responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)

} else if (!isCoordinatorForGroup(groupId)) {

responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)

} else {
得到对应的group并对这个group执行doSyncGroup的函数调用来完成操作。

val group = groupManager.getGroup(groupId)

if (group == null)

responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)

else

doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)

}

}

接下来看看这个doSyncGroup函数具体流程:

private def doSyncGroup(group: GroupMetadata,

generationId: Int,

memberId: String,

groupAssignment: Map[String, Array[Byte]],

responseCallback: SyncCallback) {

var delayedGroupStore: Option[DelayedStore] = None

group synchronized {

if (!group.has(memberId)) {

responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)

} else if (generationId != group.generationId) {

responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)

} else {

group.currentState match {

case Dead =>

responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)

case PreparingRebalance =>

responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)

case AwaitingSync =>
如果是执行syncGroup的操作时,那么这个时候,group的状态只有在AwaitingSync的状态时才执行处理,其它情况下,不执行相关的操作。
这里先根据当前的memberId在对应的member中的回调函数进行设置,这个回调函数并不会立即响应,只有在memberId是leader的consumer时,处理完成后才统一对所有的consumer进行响应。

group.get(memberId).awaitingSyncCallback = responseCallback

completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))

if (memberId == group.leaderId) {
只有在memberId是所有的consumer的leader时,才执行的处理流程。

info(s"Assignment received from leader for group
${group.groupId} for generation ${group.generationId}")

这里得到所有的已经分配的consumer的partition的分配信息。

// fill any missing members with an empty assignment

val missing = group.allMembers -- groupAssignment.keySet

val assignment = groupAssignment ++ missing.map(_ ->
Array.empty[Byte]).toMap

这里把已经分配的consumer的partition的分配集合与group通过GroupMetadataManager中的perpareStoreGroup进行处理,
prepareStoreGroup的处理流程:
这里主要先生成一个DelayedStore实例,并通过groupManager中的store函数来执行这个实例,这里直接通过调用replicaManager中的appendMessages函数来进行处理并这个group所在的topic的partition中写入一条group的metadata的信息,这个信息的key就是group的id,value是group实例与group实例对应的member的partition的分配信息。
处理完成的回调操作:
1,如果处理完成后,group的状态还是AwaitingSync同时generationId的值没有发生变化,表示执行出错,这个时候,重新执行rebalance的操作。
2,group的sync操作完成处理,更新group的状态为Stable状态,更新group中对应已经分配过partition的consumer的member中的assignment的值为分配的partition的信息,同时这个时候向所有的member对应的consumer的client端进行回调操作,向client端写入对应的分配信息。

delayedGroupStore = Some(groupManager.prepareStoreGroup(group,
assignment, (errorCode: Short) =>
{

group synchronized {

if (group.is(AwaitingSync) && generationId == group.generationId) {

if (errorCode != Errors.NONE.code) {

resetAndPropagateAssignmentError(group, errorCode)

maybePrepareRebalance(group)

} else {

setAndPropagateAssignment(group, assignment)

group.transitionTo(Stable)

}

}

}

}))

}

case Stable =>

// if the group is stable, we just return the current assignment

val memberMetadata = group.get(memberId)

responseCallback(memberMetadata.assignment, Errors.NONE.code)

completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))

}

}

}

delayedGroupStore.foreach(groupManager.store)

}

Consumer的心跳处理

接收consumer的心跳报告

当consumer向对应的leader broker报告心跳时,通过GroupCoordinator实例中的handleHeartbeat的函数进行处理。

在这个函数中,根据当前的group的对应的状态,进行处理,如果group对应的partition的leader正在切换或者leader已经挂掉向老的leader节点发起了请求,或者group中对应的member已经超时被移出掉时直接响应对应的错误代码,让consumer执行相对的重新rejoin或者重新连接新的coordinator的leader的操作,否则执行下面代码的部分操作:
更新这上member的最后一次心跳的时间,并添加这个心跳的超时监听。
val member = group.get(memberId)

completeAndScheduleNextHeartbeatExpiration(group, member)

responseCallback(Errors.NONE.code)

Consumer的心跳超时监听

在每次对group对应的member进行操作时,或者每个consumer定时向coorniator发起心跳时,会重新注册这个超时监听,
通过completeAndScheduleNextHeartbeatExpiration函数。

接下来看看这个函数实现流程:
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata,
member: MemberMetadata) {
设置这个consumer的成员的最后一次更新时间为最后一次对这个member的操作时间,

// complete current heartbeat expectation

member.latestHeartbeat = SystemTime.milliseconds

根据这个memberId与group的id生成一个MemberKey的值,检查是否有老的delay已经存在,把已经存在的DelayedHeartbeat的操作完成的,从监听器中移出。

val memberKey = MemberKey(member.groupId, member.memberId)
这里如果已经有一个重复的心跳超时监听存在,先完成上一个心跳的超时监听,当前的心跳时间被更新,已经大于了上一次的心跳更新时间。
比较的关键代码:
{
member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
member.latestHeartbeat + member.sessionTimeoutMs这一部分代码表示得到这一次心跳的超时时间,
heartbeatDeadline表示是当前的operation(也就是上次更新心跳时计算出来的超时时间),
中间使用大于号,表示如果是第一次的时候,进行监听,等待超时或者下一次心跳过来。
}

heartbeatPurgatory.checkAndComplete(memberKey)

根据当前的心跳时间也就是当前时间加上session的超时时间,设置这个心跳的过期时间为这个时间,
生成一个DelayedHeartbeat的实例。并完成这个实例或者加入到监听器中。

// reschedule the next heartbeat expiration deadline

val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs

val delayedHeartbeat = new DelayedHeartbeat(this, group, member,
newHeartbeatDeadline, member.sessionTimeoutMs)

heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))

}

处理心跳超时的具体处理流程:
1,在heartbeatPurgatory中的tryCompleteElseWatch函数中首先执行DelayedHeartbeat的tryComplete函数,这个函数直接调用GroupCoortinator实例中的tryCompleteHeartbeat函数。
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata,
heartbeatDeadline: Long, forceComplete: () => Boolean) = {

group synchronized {
如果说member是一个已经退出的member或者memeber中的joinCallback或者syncCallback函数不空null或者最后一次心跳的时间加上session的超时时间大于了上一次心跳的时间加上session的超时时间的值(这是表示心跳在未超时的时间内重复过来时,结束掉上一次的监听),执行forceComplete函数,这个函数设置completed属性的值为true,表示需要这个task完成返回true,否则其它情况返回false,
也就是说,这个监听器会一直等待,直到这个心跳超时或者下一次的心跳过来。

if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)

forceComplete()

else false

}

}

2,根据1中执行tryComplete得到的返回值,如果返回的是true,表示成功完成(这种情况通常是第二次心跳过来时,这里会返回true),直接返回,否则执行下一个操作。
var isCompletedByMe = operation synchronized operation.tryComplete()

if (isCompletedByMe)

return true

3,把这个DelayHeartbeat的实例添加到Watchers中,并添加到Timer中,如果没有更多的心跳请求过来时,在timer的函数中,在指定的超时时间达到后会设置这个实例的forceComplete函数来设置这个实例的completed的函数,从timer中移出这个实例,并执行这个member的过期操作,从group中移出这个member,如果group的状态是Stable或者是AwaitingSync状态时,重新执行rebalance的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息