KafkaConsumer源码解析
2017-03-03 00:00
302 查看
#测试代码
上次讲了KafkaProducer的用法和实现代码,这里继续来看看Consumer是怎样工作的。
同样先来看看示例代码:
应用代码很简单,消费数据的流程是这样的:
创建一个ConsumerConnector对象实例,负责和zookeeper通信
ConsumerConnector实例在zookeeper上注册相应节点,初始化若干条Stream负责和kafka-Broker通信。
每条Stream上都可以创建一个Iterator来获取消息。
#ConsumerConnector接口
这里使用的是kafka通过scala实现此接口的类:
下面摘自scaladoc:
ZookeeperConsumerConnector类处理和zookeeper的交互工作,包括:
在/consumers/[group_id]/注册
每个consumer在一个group中都有自己的唯一id。consumer在创建的时候会在上述路径中创建一个临时节点[ids/节点名],保存此consumer读取的topic列表。Consumer会监视其所在的[group_id]目录的变化,比如说ids目录变化就会触发一次rebalance。这里的id由消费者指定,而不是zk按序生成。
此路径下包含:
/consumers/[group_id]/ids。ids目录下为本group中每个存活的consumer都创建一个节点consumer-id
/consumers/[group_id]/owners。owners目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的consumer-id
/consumers/[group_id]offsets。offsets目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的offset
监听broker节点:/brokers/[0...N] --> { "host" : "host:port", "topics" : {"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } }
/brokers/[ids]/下每一个子节点代表一个正在运行的broker。在kafka的配置中的broker.id参数对应的就是这里的ids。节点内容为json格式,内容为broker监听的host和端口
/broker/[topics]/下包含所有topic的信息
进入ZookeeperConsumerConnector后,首先看到:
创建了一个val变量(类似于Java中的final) underlying, 其实这是作为一个单例在处理consumer客户端跟zookeeper的交互的核心。
然后是val messageStreamCreated,目的是为了防止多次在同一consumer上创建多次stream.(具体目的还在研究中)
而这里我们发现其实这里存在两个同名不同包的ZookeeperConsumerConnector,java直接调用的是
按照惯例,同样先来看看这个类有什么类属性,
上次讲了KafkaProducer的用法和实现代码,这里继续来看看Consumer是怎样工作的。
同样先来看看示例代码:
import kafka.consumer.*; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class SimpleConsumer { private Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class); private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private static String groupId = ""; public static void main(String [] args){ String zookeeper = "node87:2181"; groupId = String.valueOf(new Date().getTime());//每次生成一个新的groupId方便测试 String topic = "test1234"; int threadCount = 1; SimpleConsumer simpleConsumer = new SimpleConsumer(zookeeper, groupId, topic); simpleConsumer.run(threadCount); try { Thread.sleep(100000); //等待100秒后关掉服务 } catch (InterruptedException e) { // } simpleConsumer.shutdown(); } public SimpleConsumer(String a_zookeeper, String a_groupId, String a_topic) { //创建一个ConsumerConnector负责和zookeeper通信,createJavaConsumerConnector(config : ConsumerConfig)是scala的方法。内部实例化了一个 //kafka.javaapi.consumer.ZookeeperConsumerConnector(config)对象返回 consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); this.topic = a_topic; this.executor = Executors.newCachedThreadPool(); } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } /** * 读取kafkaStream方法 */ public void run(int a_numThreads){ //a_numThreads=1 //topic,创建stream的数量 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, a_numThreads); //创建MessageStreams Map Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(a_numThreads); executor.execute(new ConsumerTest(streams.get(0))); //因为上面只创建了一条stream,这里直接获取之 } public class ConsumerTest implements Runnable { KafkaStream<byte[], byte[]> stream; public ConsumerTest(KafkaStream<byte[], byte[]> stream) { this.stream = stream; } public void run() { //每个stream都支持一个Iterator用来获取消息 ConsumerIterator iterator = stream.iterator(); LOG.info("groupId:{}",groupId); while(true){ try { if(iterator.hasNext()) { MessageAndMetadata<byte[], byte[]> data = iterator.next(); LOG.info("message:{}, partition:{}, offset:{},", new String(data.message()), data.partition(), data.offset()); } }catch (ConsumerTimeoutException e){ System.out.println(Thread.currentThread().getName() + "----" + "超时..."); } } } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); //zookeeper地址 props.put("group.id", a_groupId); //group id props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "largest"); //新group-consumer启动后从最新(largest)/最旧(smalles)的数据开始读取 props.put("consumer.timeout.ms","3000"); //消费者等待新消息时间,超过此时间没有收到新的消息会抛出一个ConsumerTimeoutException,如果设为-1 return new ConsumerConfig(props); } }
应用代码很简单,消费数据的流程是这样的:
创建一个ConsumerConnector对象实例,负责和zookeeper通信
ConsumerConnector实例在zookeeper上注册相应节点,初始化若干条Stream负责和kafka-Broker通信。
每条Stream上都可以创建一个Iterator来获取消息。
#ConsumerConnector接口
这里使用的是kafka通过scala实现此接口的类:
kafka.javaapi.consumer.ZookeeperConsumerConnector
下面摘自scaladoc:
ZookeeperConsumerConnector类处理和zookeeper的交互工作,包括:
在/consumers/[group_id]/注册
每个consumer在一个group中都有自己的唯一id。consumer在创建的时候会在上述路径中创建一个临时节点[ids/节点名],保存此consumer读取的topic列表。Consumer会监视其所在的[group_id]目录的变化,比如说ids目录变化就会触发一次rebalance。这里的id由消费者指定,而不是zk按序生成。
此路径下包含:
/consumers/[group_id]/ids。ids目录下为本group中每个存活的consumer都创建一个节点consumer-id
/consumers/[group_id]/owners。owners目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的consumer-id
/consumers/[group_id]offsets。offsets目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的offset
监听broker节点:/brokers/[0...N] --> { "host" : "host:port", "topics" : {"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } }
/brokers/[ids]/下每一个子节点代表一个正在运行的broker。在kafka的配置中的broker.id参数对应的就是这里的ids。节点内容为json格式,内容为broker监听的host和端口
/broker/[topics]/下包含所有topic的信息
进入ZookeeperConsumerConnector后,首先看到:
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only extends ConsumerConnector { private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) private val messageStreamCreated = new AtomicBoolean(false) //... ... }
创建了一个val变量(类似于Java中的final) underlying, 其实这是作为一个单例在处理consumer客户端跟zookeeper的交互的核心。
然后是val messageStreamCreated,目的是为了防止多次在同一consumer上创建多次stream.(具体目的还在研究中)
而这里我们发现其实这里存在两个同名不同包的ZookeeperConsumerConnector,java直接调用的是
kafka.javaapi.consumer.ZookeeperConsumerConnector, 而在此类内部实例化的时候创建的是一个
kafka.consumer.ZookeeperConsumerConnector类的实例。
按照惯例,同样先来看看这个类有什么类属性,
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only extends ConsumerConnector with Logging with KafkaMetricsGroup { private val isShuttingDown = new AtomicBoolean(false) //关闭标识 private val rebalanceLock = new Object //rebalance锁 private var fetcher: Option[ConsumerFetcherManager] = None // private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var offsetsChannel: BlockingChannel = null private val offsetsChannelLock = new Object private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId)) private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId)) private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId))) val consumerIdString = { var consumerUuid : String = null config.consumerId match { case Some(consumerId) // for testing only => consumerUuid = consumerId case None // generate unique consumerId automatically => val uuid = UUID.randomUUID() consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) } config.groupId + "_" + consumerUuid } this.logIdent = "[" + consumerIdString + "], "
相关文章推荐
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- Kafka源码深度解析-序列9 -Consumer -SubscriptionState内部结构分析
- Kafka源码深度解析-序列6 -Consumer -消费策略分析
- Kafka源码深度解析-序列8 -Consumer -Fetcher实现原理与offset确认机制
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- kafka源码解析之九ReplicaManager
- kafka源码解析之十OffsetManager
- Kafka设计解析(四):Kafka Consumer解析
- kafka源码解析之一kafka诞生的背景
- Kafka设计解析:Kafka Consumer解析
- kafka源码解析之十六生产者流程(客户端如何向topic发送数据)
- kafka源码解析之八LogManager
- kafka源码解析之二kafka内部的专业术语
- kafka源码解析之目录索引
- kafka源码解析之十三KafkaHealthcheck
- kafka源码解析之十四TopicConfigManager
- kafka源码解析之十一KafkaApis
- Kafka设计解析(四):Kafka Consumer解析
- Kafka的Producer和Consumer源码学习
- kafka源码解析之四Broker的模块组成