apache kafka源码分析走读-ZookeeperConsumerConnector分析
2014-08-09 17:49
363 查看
apache kafka中国社区QQ群:162272557
1.ZookeeperConsumer架构
ZookeeperConsumer类中consumer运行过程架构图:图1
过程分析:
ConsumerGroupExample类
2.消费者线程(consumer thread),队列,拉取线程(fetch thread)三者之间关系
每一个topic至少需要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread>==partitions数量,否则会有consumer thread空闲。部分代码示例如下:
ConsumerConnector consumer
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test-string-topic", new Integer(1)); //value表示consumer thread线程数量
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
具体说明一下三者关系:
(1).topic的partitions分布规则
paritions是安装kafka brokerId有序分配的。
例如现在有三个node安装了kafka broker服务端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情况为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。
创建consumer thread
consumer thread数量与BlockingQueue一一对应。
a.当consumer thread count=1时
此时有一个blockingQueue1,三个fetch thread线程,该topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。3个fetch thread线程去拉取消息数据,最终放到blockingQueue1中,等待consumer
thread来消费。
消费者线程,缓冲队列,partitions分布列表如下
consumer线程 | Blocking Queue | partitions |
consumer thread1 | blockingQueue1 | 0,1,2,3,4,5,6,7,8,9,10,11 |
fetch线程 | partitions |
fetch thread1 | 0,3,6,9 |
fetch thread2 | 1,4,7,10 |
fetch thread3 | 2,5,8,11 |
此时有consumerThread1和consumerThread2分别对应2个队列blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;消费者线程,缓冲队列,partitions分布列表如下
consumer线程 | Blocking Queue | partitions |
consumer thread1 | blockingQueue1 | 0,1,2,3,4,5 |
consumer thread2 | blockingQueue2 | 6,7,8,9,10,11 |
fetch线程 | partitions |
fetch thread1 | 0,3,6,9 |
fetch thread2 | 1,4,7,10 |
fetch thread3 | 2,5,8,11 |
消费者线程,缓冲队列,partitions分布列表如下
consumer线程 | Blocking Queue | partitions |
consumer thread1 | blockingQueue1 | 0,1,2 |
consumer thread2 | blockingQueue2 | 3,4,5 |
consumer thread3 | blockingQueue3 | 6,7,8 |
consumer thread4 | blockingQueue4 | 9,10,11 |
同上
同理当消费线程consumer thread count=n,都是安装上述分布规则来处理的。
3.consumer消息线程以及队列创建逻辑
运用ZookeeperConsumerConnector类创建多线程并行消费测试类,ConsumerGroupExample类初始化,调用createMessageStreams方法,实际是在consume方法处理的逻辑,创建KafkaStream,以及阻塞队列(LinkedBlockingQueue),KafkaStream与队列个数一一对应,消费者线程数量决定阻塞队列的个数。registerConsumerInZK()方法:设置消费者组,注册消费者信息consumerIdString到zookeeper上。
consumerIdString产生规则部分代码如下:
String consumerUuid = null; if(config.consumerId!=null && config.consumerId) consumerUuid = consumerId; else { String uuid = UUID.randomUUID() consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)); } String consumerIdString = config.groupId + "_" + consumerUuid;
kafka zookeeper注册模型结构或存储结构如下:
kafka在zookeeper中存储结构
说明:目前把kafka中绝大部分存储模型都列表出来了,当前还有少量不常使用的,暂时还没有列举,后续会加上。
consumer初始化逻辑处理:
1.实例化并注册loadBalancerListener监听,ZKRebalancerListener监听consumerIdString状态变化
触发consumer reblance条件如下几个:
ZKRebalancerListener:当/kafka01/consumer/[consumer-group]/ids子节点变化时,会触发
ZKTopicPartitionChangeListener:当该topic的partitions发生变化时,会触发。
val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"
zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
consumer reblance逻辑
consumer offset更新机制
reblance计算规则:
后续.....
相关文章推荐
- Kafka 0.8源码分析—ZookeeperConsumerConnector
- apache kafka源码分析走读-Producer分析
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-kafkaApi详解
- apache kafka源码分析走读-Producer分析
- apache kafka系列之源码分析走读-kafkaApi详解
- apache kafka系列之源码分析走读-kafka内部模块分析
- apache kafka系列之源码分析走读-SocketServer分析
- kafka的consumerConnector.createMessageStreams 方法源码分析
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-kafka内部模块分析
- Zookeeper源码分析(4)- Follower执行流程
- Zookeeper源码分析(3)- Leader执行流程
- Zookeeper源码分析(8)- CommitProcessor
- Zookeeper源码分析(7)- SyncRequestProcessor
- Zookeeper源码分析(5)- PrepRequestProcessor
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
- Zookeeper源码分析(1)- Zookeeper的启动流程
- Zookeeper源码分析(9)- ToBeAppliedRequestProcessor
- twitter storm 源码走读之5 -- worker进程内部消息传递处理和数据结构分析