springboot2.x +kafka使用和源码分析五(消费者配置使用)
上一章描述springboot对于kafka事务的支持,本章主要叙说springboot对于consumer支持。
这里通过两种方式
第一种:由springboot框架来初始化基础bean,我们只需要在yml配置文件中编写配置即可。如下图所示(常规配置具体所有配置可参考http://kafka.apache.org/documentation/ 的consumer):
springboot初始化bena源码:
[code]@Configuration(proxyBeanMethods=false) @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import({KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class}) /** *初始化consumer工厂类创建Consumer */ publicclassKafkaAutoConfiguration{ 。。。。 @Bean @ConditionalOnMissingBean(ConsumerFactory.class) publicConsumerFactory<?,?>kafkaConsumerFactory(){ returnnewDefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()); } 。。。。 } /** *注入并行监听容器 */ @Configuration(proxyBeanMethods=false) @ConditionalOnClass(EnableKafka.class) classKafkaAnnotationDrivenConfiguration{ 。。。。 /** *创建MessageListenerContainer工厂类 */ @Bean @ConditionalOnMissingBean(name="kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory<?,?>kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurerconfigurer, ConsumerFactory<Object,Object>kafkaConsumerFactory){ ConcurrentKafkaListenerContainerFactory<Object,Object>factory=newConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory,kafkaConsumerFactory); returnfactory; } @Configuration(proxyBeanMethods=false) @EnableKafka @ConditionalOnMissingBean(name=KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) staticclassEnableKafkaConfiguration{ } }
对于kafkaListenerContainerFactory 的用于创建MessageListenerContainer消息监听容器类
MessageListenerContainer有两个实现类KafkaMessageListenerContainer,ConcurrentMessageListenerContainer
KafkaMessageListenerContainer:以单线程的方式消费topic中所有partition数据
ConcurrentMessageListenerContainer:以并行的方式消费topic中所有partition数据(开启多线程),每一个线程会对应一个partition所有建议对于ConcurrentMessageListenerContainer的并发数与topic的partition数保持一致
MessageListenerContainer的作用在于对于MessageListener的管理
MessageListener这个接口的作用又是什么呢?用于消费topic中的数据,并对offset进行管理(2.3以后默认是手动提交)
springboot对于MessageListener又默认8中实现:
[code]1:publicinterfaceMessageListener<K,V>{ voidonMessage(ConsumerRecord<K,V>data); } 2:publicinterfaceAcknowledgingMessageListener<K,V>{ voidonMessage(ConsumerRecord<K,V>data,Acknowledgmentacknowledgment); } 3:publicinterfaceConsumerAwareMessageListener<K,V>extendsMessageListener<K,V>{ voidonMessage(ConsumerRecord<K,V>data,Consumer<?,?>consumer); } 4:publicinterfaceAcknowledgingConsumerAwareMessageListener<K,V>extendsMessageListener<K,V>{ voidonMessage(ConsumerRecord<K,V>data,Acknowledgmentacknowledgment,Consumer<?,?>consumer); } 5:publicinterfaceBatchMessageListener<K,V>{ voidonMessage(List<ConsumerRecord<K,V>>data); } 6:publicinterfaceBatchAcknowledgingMessageListener<K,V>{ voidonMessage(List<ConsumerRecord<K,V>>data,Acknowledgmentacknowledgment); } 7:publicinterfaceBatchConsumerAwareMessageListener<K,V>extendsBatchMessageListener<K,V>{ voidonMessage(List<ConsumerRecord<K,V>>data,Consumer<?,?>consumer); } 8:publicinterfaceBatchAcknowledgingConsumerAwareMessageListener<K,V>extendsBatchMessageListener<K,V>{ voidonMessage(List<ConsumerRecord<K,V>>data,Acknowledgmentacknowledgment,Consumer<?,?>consumer); }
注:
- offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例。
- offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例。
- offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
- offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
- offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例。
- offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例。
- offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。
- offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。
KafkaMessageListenerContainer使用demo:
[code]publicKafkaMessageListenerContainermessageListenerContainer1(@Qualifier("consumerFactory")ConsumerFactory<Integer,String>consumerFactory){ //创建topic分区 TopicPartitiontopicPartition1=newTopicPartition("springboot_test_topic",0); //设置需要消费的分区偏移量 TopicPartitionOffsettpo1=newTopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //创建topic分区 TopicPartitiontopicPartition2=newTopicPartition("springboot_test_topic",1); //设置需要消费的分区偏移量 TopicPartitionOffsettpo2=newTopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //创建topic分区 TopicPartitiontopicPartition3=newTopicPartition("springboot_test_topic",2); //设置需要消费的分区偏移量 TopicPartitionOffsettpo3=newTopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //设置topic的主题和分区可以指定从哪个分区哪个offset开始消费 //容器配置 ContainerPropertiescontainerProperties=newContainerProperties(tpo1,tpo2,tpo3); /** *指定 */ containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); containerProperties.setClientId("springboot_test_topic_id"); containerProperties.setGroupId("springboot_test_topic_group"); containerProperties.setAckTime(6000); containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false"); containerProperties.setPollTimeout(3000); containerProperties.setAckOnError(false); //绑定message消息监听手动提交单条消费 containerProperties.setMessageListener((AcknowledgingMessageListener<Integer,String>)(consumerRecord,acknowledgment)->{ System.out.println("partition============>"+consumerRecord.partition()); System.out.println("offset============>"+consumerRecord.offset()); System.out.println("key============>"+consumerRecord.key()); System.out.println("value============>"+consumerRecord.value()); //确定消费完成commitoffset acknowledgment.acknowledge(); }); //构建kafka消费者监听容器 KafkaMessageListenerContainer<Integer,String>kafkaMessageListenerContainer= newKafkaMessageListenerContainer<Integer,String>(consumerFactory,containerProperties); //启动消费监听 kafkaMessageListenerContainer.start(); returnkafkaMessageListenerContainer; }
ConcurrentMessageListenerContainer使用demo:
[code]publicConcurrentMessageListenerContainermessageListenerContainer(@Qualifier("consumerFactory")ConsumerFactory<Integer,String>consumerFactory){ StringtopicName="springboot_test_topic"; //创建topic分区 TopicPartitiontopicPartition1=newTopicPartition(topicName,0); //设置需要消费的分区偏移量 TopicPartitionOffsettpo1=newTopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //创建topic分区 TopicPartitiontopicPartition2=newTopicPartition(topicName,1); //设置需要消费的分区偏移量 TopicPartitionOffsettpo2=newTopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //创建topic分区 TopicPartitiontopicPartition3=newTopicPartition(topicName,2); //设置需要消费的分区偏移量 TopicPartitionOffsettpo3=newTopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING); //ConsumerFactory设置topic的主题和分区 //容器配置 ContainerPropertiescontainerProperties=newContainerProperties(tpo1,tpo2,tpo3); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); containerProperties.setClientId("springboot_test_topic_concurrent_id"); containerProperties.setGroupId("springboot_test_topic_concurrent_group"); containerProperties.setAckTime(6000); containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false"); containerProperties.setPollTimeout(3000); containerProperties.setAckOnError(false); //绑定message消息监听自定提交单条消费存在丢失数据以及重复消费的问题 containerProperties.setMessageListener((AcknowledgingMessageListener<Integer,String>)(consumerRecord,acknowledgment)->{ System.out.println("partition============>"+consumerRecord.partition()); System.out.println("offset============>"+consumerRecord.offset()); System.out.println("key============>"+consumerRecord.key()); System.out.println("value============>"+consumerRecord.value()); //确定消费完成commitoffset acknowledgment.acknowledge(); }); containerProperties.getTopicPartitionsToAssign(); ConcurrentMessageListenerContainer<Integer,String>cmlc=newConcurrentMessageListenerContainer(consumerFactory,containerProperties); //是否设置虽容器自动启动 cmlc.setAutoStartup(true); cmlc.setBeanName("concurrentMessageListenerContainer"); //设置并发数 cmlc.setConcurrency(3); //启动消费监听 cmlc.start(); returncmlc; }
上述两个demo都是通过手动commitoffset(建议使用这种方式,保证数据的一致性),通过containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);AckMode设置commitoffset方式,有以下几种:
-
RECORD
:当listen每处理一条就commitoffset自动提交 -
BATCH
:在listen处理完poll()返回的所有记录后提交偏移量,针对于批处理。自动提交 -
TIME
:每次间隔ackTime的时间去commit -
COUNT
:累积达到ackCount次的ack去commit -
COUNT_TIME
:TIME和COUNT的都满足则执行提交. -
MANUAL
:需要手动确认消息(Acknowledgement.acknowledge()),其它机制与BATCH机制相同
-
MANUAL_IMMEDIATE
:当listenr中调用Acknowledgement.acknowledge()方法时立即提交偏移量
测试结果
使用@KafkaListener注解方式
[code]@Target({ElementType.TYPE,ElementType.METHOD,ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(KafkaListeners.class) public@interfaceKafkaListener{ //配置消费者id Stringid()default""; //设置容器工厂类默认为ConcurrentKafkaListenerContainerFactory并行 StringcontainerFactory()default""; //设置需要消费的topic名支持多topic消费 String[]topics()default{}; //支持topic名正则设置 StringtopicPattern()default""; //topic分区类,设置主题和分区(以及可选的初始offset) TopicPartition[]topicPartitions()default{}; //容器组如果该值被设置,将该容器添加到一个集合中并保存到该bean中,在这个容器集合中可以开启或关闭容器 StringcontainerGroup()default""; //提供在消费message时报错时Handler类 StringerrorHandler()default""; //消费者组id StringgroupId()default""; //2.x,该id属性(如果存在)将用作Kafka消费者group.id属性覆盖消费者工厂中的已配置属性 booleanidIsGroup()defaulttrue; //客户端id前缀 StringclientIdPrefix()default""; //在spring容器对象中的bean名 StringbeanRef()default"__listener"; //消费并行度此值最好等于topic的partition数使用${listen.concurrency:3} Stringconcurrency()default""; //是否设置为自动启动"${listen.auto.start:true}"容器需要实现SmartLifecycle StringautoStartup()default""; //设置kafka消费者配置 //支持k=v,k:v,kv模式如max.poll.interval.ms=3000 String[]properties()default{}; }
@kafkaListener使用demo:
[code]@KafkaListener(id="springboot_test_topic_id" //,topics={"springboot_test_topic"} ,topicPartitions={ //设置消费topicspringboot_test_topicpartition为0数据,1分区起始offset为0注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。 @TopicPartition(topic="springboot_test_topic",partitionOffsets={ @PartitionOffset(partition="0",initialOffset="0"), @PartitionOffset(partition="1",initialOffset="0"), @PartitionOffset(partition="2",initialOffset="0"), }) } ,containerFactory="kafkaListenerContainerFactory" ,clientIdPrefix="_listener" ,concurrency="${listen.concurrency:3}" ,idIsGroup=false ,groupId="springboot_test_topic-group" ,beanRef="springboot_test_topic" ,autoStartup="${listen.auto.start:true}" //,errorHandler="validationErrorHandler" ,properties={"max.poll.interval.ms=3000", "max.poll.records=30", "enable.auto.commit=false", "bootstrap.servers=localhost:9092" } ) publicvoidlisten(ConsumerRecord<Integer,String>record //如果需要使用Acknowledgment需要在factory中指定ACKmodel //factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); ,Acknowledgmentack, @Header(KafkaHeaders.GROUP_ID)StringgroupId, @Header(KafkaHeaders.RECEIVED_PARTITION_ID)intpartition, @Header(KafkaHeaders.OFFSET)intoffset, @Header(KafkaHeaders.RECEIVED_TOPIC)Stringtopic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP)longtimestamp){ //消息消息 System.out.println("=======key=========>"+record.key()); System.out.println("=======value=========>"+record.value()); // System.out.println("=======topic=========>"+topic); System.out.println("=======groupId=========>"+groupId); System.out.println("=======partition=========>"+partition); System.out.println("=======offset=========>"+offset); System.out.println("=======timestamp=========>"+timestamp); //手动确定提交offset ack.acknowledge(); }
有关message的元数据可从消息头获得,这里通过@Header注解获取:
[code]KafkaHeaders.OFFSET// 获取当前信息的offset KafkaHeaders.RECEIVED_MESSAGE_KEY// 获取messagekey信息 KafkaHeaders.RECEIVED_TOPIC//获取当前信息所在topic信息 KafkaHeaders.RECEIVED_PARTITION_ID//获取当前信息所在partitionId信息 KafkaHeaders.RECEIVED_TIMESTAMP//获取当前信息的时间戳 KafkaHeaders.TIMESTAMP_TYPE//获取当前信息的时间戳类型类型 KafkaHeaders.GROUP_ID//获取当前消费组id
测试结果:
kafka对于批次消费支持:
第一步:构建consumer批次监听容器
[code]/** *构建并行消费监听容器批次处理 *@paramconsumerFactory *@return */ @Bean publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>bitchFactory(ConsumerFactory<Integer,String>consumerFactory){ //构建kafka并行消费监听类工厂类此类通过topic名称创建该topic消费监听 ConcurrentKafkaListenerContainerFactory<Integer,String>concurrentKafkaListenerContainerFactory= newConcurrentKafkaListenerContainerFactory<>(); //可通过注解的方式进行设置 concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory); //设置拉取时间超时数 concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(3000); //是否开启批次处理 concurrentKafkaListenerContainerFactory.setBatchListener(true); //设置ack模型机制当发生error时不同处理机制针对与offset有不同处理机制 concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); returnconcurrentKafkaListenerContainerFactory; } /** *注册监听bean *@return */ @Bean("bitchListeners") publicBitchListenersbitchListeners(){ returnnewBitchListeners("springboot_test_topic"); }
第二步:使用
[code] /** *使用SpEL表达式支持特殊的标记:__bean名称 *@authorfangyuan */ publicclassBitchListeners{ /** *topic名称 * */ privatefinalStringtopic; publicBitchListeners(Stringtopic){ this.topic=topic; } publicStringgetTopic(){ returnthis.topic; } /** *批处理 * */ @KafkaListener(id="bitchConsumer", topicPartitions={ //设置消费topicspringboot_test_topicpartition为0数据,1分区起始offset为0注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。 @TopicPartition(topic="#{bitchListeners.topic}",partitionOffsets={ @PartitionOffset(partition="0",initialOffset="0"), @PartitionOffset(partition="1",initialOffset="0"), @PartitionOffset(partition="2",initialOffset="0"), }) } ,containerFactory="bitchFactory" //,clientIdPrefix="" ,idIsGroup=false ,concurrency="${listen.concurrency:3}" ,groupId="bitchConsumer-group" ,beanRef="bitchConsumer_" ,autoStartup="${listen.auto.start:true}" ,properties={ "max.poll.interval.ms=3000", //设置一个批次拉取最大消息数 "max.poll.records=5"}) publicvoidlisten(List<ConsumerRecord<Integer,String>>records,Acknowledgmentack){ System.out.println("该批次拉取消息数====================>"+records.size()); //遍历消息 records.forEach(record->{ //消息消息 System.out.print("=======key=========>"+record.key()); System.out.print("=======value=========>"+record.value()); System.out.print("=======topic=========>"+record.partition()); System.out.print("=======topic=========>"+record.offset()); System.out.println(); }); //手动确定提交offset ack.acknowledge(); } // ///** //*@parammessages //*/ //@ //KafkaListener(id="bitchConsumer2" //,topics={"__listener.topic"} //,containerFactory="bitchFactory" //,idIsGroup=false //,concurrency="${listen.concurrency:3}" //,groupId="bitchConsumer-1") //publicvoidlistMsg(List<Message<String>>messages){ //// //} /** *添加批次ack确认机制 * *@parammessages *@paramack */ publicvoidlistMsgAck(List<Message<String>>messages,Acknowledgmentack){ } /** *@parammessages *@paramack *@paramconsumer */ publicvoidlistMsgAckConsumer(List<Message<String>>messages,Acknowledgmentack, Consumer<Integer,String>consumer){ // } /** *如果使用使用ConsumerRecord接收消息方法入参数只能为ConsumerRecord(如果启用ack机制可以包含Acknowledgment) *@paramlist */ publicvoidlistCRs(List<ConsumerRecord<Integer,String>>list){ } }
测试结果:
Demo项目github地址:
- 点赞 1
- 收藏
- 分享
- 文章举报
- springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)
- springboot2.x +kafka使用和源码分析八(自定义分区器)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- springboot2.x +redis使用和源码分析三(序列化器)
- SpringBoot系列三:SpringBoot基本概念(统一父 pom 管理、SpringBoot 代码测试、启动注解分析、配置访问路径、使用内置对象、项目打包发布)
- 为什么Spring Boot推荐使用logback-spring.xml来替代logback.xml来配置logback日志的问题分析
- SpringBoot源码分析之BeanDefinitionLoader注册主Configuration的Java配置类
- SpringBoot Profile使用详解及配置源码解析
- Spring Boot下Druid连接池的使用配置分析
- Spring Boot下Druid连接池的使用配置分析
- springboot源码分析16-spring boot监听器使用
- SpringBoot——自动配置源码分析
- SpringBoot源码分析之环境和配置文件的加载
- springboot源码分析10-ApplicationContextInitializer使用
- 通过实例及源码分析关于SpringBoot启动类启动时自动配置问题
- Spring Boot下Druid连接池的使用配置分析
- 详解Spring Boot下Druid连接池的使用配置分析
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- Springboot 2使用外部Tomcat源码分析