您的位置:首页 > 运维架构 > Apache

apache kafka源码分析走读-ZookeeperConsumerConnector分析

2014-08-09 17:49 363 查看


apache kafka中国社区QQ群:162272557

1.ZookeeperConsumer架构

ZookeeperConsumer类中consumer运行过程架构图:



图1

过程分析:

ConsumerGroupExample类

2.消费者线程(consumer thread),队列,拉取线程(fetch thread)三者之间关系

每一个topic至少需要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread>==partitions数量,否则会有consumer thread空闲。

部分代码示例如下:

ConsumerConnector consumer

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put("test-string-topic", new Integer(1)); //value表示consumer thread线程数量

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

具体说明一下三者关系:

(1).topic的partitions分布规则

paritions是安装kafka brokerId有序分配的。

例如现在有三个node安装了kafka broker服务端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情况为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。

创建consumer thread

consumer thread数量与BlockingQueue一一对应。

a.当consumer thread count=1时

此时有一个blockingQueue1,三个fetch thread线程,该topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。3个fetch thread线程去拉取消息数据,最终放到blockingQueue1中,等待consumer
thread来消费。

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue
partitions
consumer thread1
blockingQueue1
0,1,2,3,4,5,6,7,8,9,10,11
fetch thread与partitions分布列表如下

fetch线程
partitions
fetch thread1
0,3,6,9
fetch thread2
1,4,7,10
fetch thread3
2,5,8,11
b. 当consumer thread count=2时

此时有consumerThread1和consumerThread2分别对应2个队列blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue
partitions
consumer thread1
blockingQueue1
0,1,2,3,4,5
consumer thread2
blockingQueue2
6,7,8,9,10,11
fetch thread与partitions分布列表如下

fetch线程

partitions
fetch thread1
0,3,6,9
fetch thread2
1,4,7,10
fetch thread3
2,5,8,11
c. 当consumer thread count=4时

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue
partitions
consumer thread1
blockingQueue1
0,1,2
consumer thread2
blockingQueue2
3,4,5
consumer thread3
blockingQueue3
6,7,8
consumer thread4
blockingQueue4
9,10,11
fetch thread与partitions分布列表如下

同上

同理当消费线程consumer thread count=n,都是安装上述分布规则来处理的。

3.consumer消息线程以及队列创建逻辑

运用ZookeeperConsumerConnector类创建多线程并行消费测试类,ConsumerGroupExample类初始化,调用createMessageStreams方法,实际是在consume方法处理的逻辑,创建KafkaStream,以及阻塞队列(LinkedBlockingQueue),KafkaStream与队列个数一一对应,消费者线程数量决定阻塞队列的个数。

registerConsumerInZK()方法:设置消费者组,注册消费者信息consumerIdString到zookeeper上。

consumerIdString产生规则部分代码如下:

String consumerUuid = null;
if(config.consumerId!=null && config.consumerId)
consumerUuid = consumerId;
else {
String uuid = UUID.randomUUID()

consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8));
}
String consumerIdString =  config.groupId + "_" + consumerUuid;

kafka zookeeper注册模型结构或存储结构如下:

kafka在zookeeper中存储结构

说明:目前把kafka中绝大部分存储模型都列表出来了,当前还有少量不常使用的,暂时还没有列举,后续会加上。

consumer初始化逻辑处理:

1.实例化并注册loadBalancerListener监听,ZKRebalancerListener监听consumerIdString状态变化

触发consumer reblance条件如下几个:

ZKRebalancerListener:当/kafka01/consumer/[consumer-group]/ids子节点变化时,会触发

ZKTopicPartitionChangeListener:当该topic的partitions发生变化时,会触发。

val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"

zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)

consumer reblance逻辑

consumer offset更新机制

reblance计算规则:

后续.....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: