您的位置:首页 > 编程语言 > Java开发

springboot2.x +kafka使用和源码分析五(消费者配置使用)

2020-01-11 18:26 507 查看

上一章描述springboot对于kafka事务的支持,本章主要叙说springboot对于consumer支持。

这里通过两种方式

第一种:由springboot框架来初始化基础bean,我们只需要在yml配置文件中编写配置即可。如下图所示(常规配置具体所有配置可参考http://kafka.apache.org/documentation/ 的consumer):

springboot初始化bena源码:

[code]@Configuration(proxyBeanMethods=false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class})
/**
*初始化consumer工厂类创建Consumer
*/
publicclassKafkaAutoConfiguration{
。。。。
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
publicConsumerFactory<?,?>kafkaConsumerFactory(){
returnnewDefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
。。。。
}

/**
*注入并行监听容器
*/
@Configuration(proxyBeanMethods=false)
@ConditionalOnClass(EnableKafka.class)
classKafkaAnnotationDrivenConfiguration{
。。。。
/**
*创建MessageListenerContainer工厂类
*/
@Bean
@ConditionalOnMissingBean(name="kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?,?>kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurerconfigurer,
ConsumerFactory<Object,Object>kafkaConsumerFactory){
ConcurrentKafkaListenerContainerFactory<Object,Object>factory=newConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory,kafkaConsumerFactory);
returnfactory;
}

@Configuration(proxyBeanMethods=false)
@EnableKafka
@ConditionalOnMissingBean(name=KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
staticclassEnableKafkaConfiguration{
}
}

 

对于kafkaListenerContainerFactory 的用于创建MessageListenerContainer消息监听容器类

MessageListenerContainer有两个实现类KafkaMessageListenerContainer,ConcurrentMessageListenerContainer

KafkaMessageListenerContainer:以单线程的方式消费topic中所有partition数据

ConcurrentMessageListenerContainer:以并行的方式消费topic中所有partition数据(开启多线程),每一个线程会对应一个partition所有建议对于ConcurrentMessageListenerContainer的并发数与topic的partition数保持一致

MessageListenerContainer的作用在于对于MessageListener的管理

MessageListener这个接口的作用又是什么呢?用于消费topic中的数据,并对offset进行管理(2.3以后默认是手动提交)

springboot对于MessageListener又默认8中实现:

[code]1:publicinterfaceMessageListener<K,V>{
voidonMessage(ConsumerRecord<K,V>data);
}

2:publicinterfaceAcknowledgingMessageListener<K,V>{
voidonMessage(ConsumerRecord<K,V>data,Acknowledgmentacknowledgment);
}

3:publicinterfaceConsumerAwareMessageListener<K,V>extendsMessageListener<K,V>{
voidonMessage(ConsumerRecord<K,V>data,Consumer<?,?>consumer);
}

4:publicinterfaceAcknowledgingConsumerAwareMessageListener<K,V>extendsMessageListener<K,V>{
voidonMessage(ConsumerRecord<K,V>data,Acknowledgmentacknowledgment,Consumer<?,?>consumer);
}

5:publicinterfaceBatchMessageListener<K,V>{
voidonMessage(List<ConsumerRecord<K,V>>data);
}

6:publicinterfaceBatchAcknowledgingMessageListener<K,V>{
voidonMessage(List<ConsumerRecord<K,V>>data,Acknowledgmentacknowledgment);
}

7:publicinterfaceBatchConsumerAwareMessageListener<K,V>extendsBatchMessageListener<K,V>{
voidonMessage(List<ConsumerRecord<K,V>>data,Consumer<?,?>consumer);
}

8:publicinterfaceBatchAcknowledgingConsumerAwareMessageListener<K,V>extendsBatchMessageListener<K,V>{
voidonMessage(List<ConsumerRecord<K,V>>data,Acknowledgmentacknowledgment,Consumer<?,?>consumer);
}

注:

  1. offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例。
  2. offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例。
  3. offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  4. offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  5. offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例。
  6. offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例。
  7. offset提交方式为自动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  8. offset提交方式为手动提交,使用此接口可以处理从Kafkaconsumerpoll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。

KafkaMessageListenerContainer使用demo:

[code]publicKafkaMessageListenerContainermessageListenerContainer1(@Qualifier("consumerFactory")ConsumerFactory<Integer,String>consumerFactory){

//创建topic分区
TopicPartitiontopicPartition1=newTopicPartition("springboot_test_topic",0);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo1=newTopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//创建topic分区
TopicPartitiontopicPartition2=newTopicPartition("springboot_test_topic",1);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo2=newTopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//创建topic分区
TopicPartitiontopicPartition3=newTopicPartition("springboot_test_topic",2);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo3=newTopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//设置topic的主题和分区可以指定从哪个分区哪个offset开始消费
//容器配置
ContainerPropertiescontainerProperties=newContainerProperties(tpo1,tpo2,tpo3);
/**
*指定
*/
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_id");
containerProperties.setGroupId("springboot_test_topic_group");
containerProperties.setAckTime(6000);
containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);

//绑定message消息监听手动提交单条消费
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer,String>)(consumerRecord,acknowledgment)->{

System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());
//确定消费完成commitoffset
acknowledgment.acknowledge();
});

//构建kafka消费者监听容器
KafkaMessageListenerContainer<Integer,String>kafkaMessageListenerContainer=
newKafkaMessageListenerContainer<Integer,String>(consumerFactory,containerProperties);

//启动消费监听
kafkaMessageListenerContainer.start();

returnkafkaMessageListenerContainer;
}

ConcurrentMessageListenerContainer使用demo:

[code]publicConcurrentMessageListenerContainermessageListenerContainer(@Qualifier("consumerFactory")ConsumerFactory<Integer,String>consumerFactory){

StringtopicName="springboot_test_topic";

//创建topic分区
TopicPartitiontopicPartition1=newTopicPartition(topicName,0);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo1=newTopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//创建topic分区
TopicPartitiontopicPartition2=newTopicPartition(topicName,1);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo2=newTopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//创建topic分区
TopicPartitiontopicPartition3=newTopicPartition(topicName,2);

//设置需要消费的分区偏移量
TopicPartitionOffsettpo3=newTopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

//ConsumerFactory设置topic的主题和分区
//容器配置
ContainerPropertiescontainerProperties=newContainerProperties(tpo1,tpo2,tpo3);

containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_concurrent_id");
containerProperties.setGroupId("springboot_test_topic_concurrent_group");
containerProperties.setAckTime(6000);
containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);

//绑定message消息监听自定提交单条消费存在丢失数据以及重复消费的问题
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer,String>)(consumerRecord,acknowledgment)->{

System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());

//确定消费完成commitoffset
acknowledgment.acknowledge();
});

containerProperties.getTopicPartitionsToAssign();

ConcurrentMessageListenerContainer<Integer,String>cmlc=newConcurrentMessageListenerContainer(consumerFactory,containerProperties);

//是否设置虽容器自动启动
cmlc.setAutoStartup(true);
cmlc.setBeanName("concurrentMessageListenerContainer");

//设置并发数
cmlc.setConcurrency(3);
//启动消费监听
cmlc.start();

returncmlc;
}

上述两个demo都是通过手动commitoffset(建议使用这种方式,保证数据的一致性),通过containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);AckMode设置commitoffset方式,有以下几种:

  • RECORD
    :当listen每处理一条就commitoffset自动提交

  • BATCH
    :在listen处理完poll()返回的所有记录后提交偏移量,针对于批处理。自动提交

  • TIME
    :每次间隔ackTime的时间去commit

  • COUNT
    :累积达到ackCount次的ack去commit

  • COUNT_TIME
    :TIME和COUNT的都满足则执行提交.

  • MANUAL
    :需要手动确认消息(Acknowledgement.acknowledge()),其它机制与
    BATCH机制相同

  • MANUAL_IMMEDIATE
    :当listenr中调用Acknowledgement.acknowledge()方法时立即提交偏移量

测试结果

 

使用@KafkaListener注解方式

[code]@Target({ElementType.TYPE,ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public@interfaceKafkaListener{

//配置消费者id
Stringid()default"";

//设置容器工厂类默认为ConcurrentKafkaListenerContainerFactory并行
StringcontainerFactory()default"";

//设置需要消费的topic名支持多topic消费
String[]topics()default{};

//支持topic名正则设置
StringtopicPattern()default"";

//topic分区类,设置主题和分区(以及可选的初始offset)
TopicPartition[]topicPartitions()default{};

//容器组如果该值被设置,将该容器添加到一个集合中并保存到该bean中,在这个容器集合中可以开启或关闭容器
StringcontainerGroup()default"";

//提供在消费message时报错时Handler类
StringerrorHandler()default"";

//消费者组id
StringgroupId()default"";

//2.x,该id属性(如果存在)将用作Kafka消费者group.id属性覆盖消费者工厂中的已配置属性
booleanidIsGroup()defaulttrue;
//客户端id前缀
StringclientIdPrefix()default"";

//在spring容器对象中的bean名
StringbeanRef()default"__listener";

//消费并行度此值最好等于topic的partition数使用${listen.concurrency:3}
Stringconcurrency()default"";

//是否设置为自动启动"${listen.auto.start:true}"容器需要实现SmartLifecycle
StringautoStartup()default"";
//设置kafka消费者配置
//支持k=v,k:v,kv模式如max.poll.interval.ms=3000
String[]properties()default{};
}

@kafkaListener使用demo:

[code]@KafkaListener(id="springboot_test_topic_id"
//,topics={"springboot_test_topic"}
,topicPartitions={
//设置消费topicspringboot_test_topicpartition为0数据,1分区起始offset为0注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
@TopicPartition(topic="springboot_test_topic",partitionOffsets={
@PartitionOffset(partition="0",initialOffset="0"),
@PartitionOffset(partition="1",initialOffset="0"),
@PartitionOffset(partition="2",initialOffset="0"),
})
}
,containerFactory="kafkaListenerContainerFactory"
,clientIdPrefix="_listener"
,concurrency="${listen.concurrency:3}"
,idIsGroup=false
,groupId="springboot_test_topic-group"
,beanRef="springboot_test_topic"
,autoStartup="${listen.auto.start:true}"
//,errorHandler="validationErrorHandler"
,properties={"max.poll.interval.ms=3000",
"max.poll.records=30",
"enable.auto.commit=false",
"bootstrap.servers=localhost:9092"
}
)
publicvoidlisten(ConsumerRecord<Integer,String>record
//如果需要使用Acknowledgment需要在factory中指定ACKmodel
//factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
,Acknowledgmentack,
@Header(KafkaHeaders.GROUP_ID)StringgroupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID)intpartition,
@Header(KafkaHeaders.OFFSET)intoffset,
@Header(KafkaHeaders.RECEIVED_TOPIC)Stringtopic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP)longtimestamp){

//消息消息
System.out.println("=======key=========>"+record.key());
System.out.println("=======value=========>"+record.value());
//
System.out.println("=======topic=========>"+topic);
System.out.println("=======groupId=========>"+groupId);
System.out.println("=======partition=========>"+partition);
System.out.println("=======offset=========>"+offset);
System.out.println("=======timestamp=========>"+timestamp);

//手动确定提交offset
ack.acknowledge();
}

有关message的元数据可从消息头获得,这里通过@Header注解获取:

[code]KafkaHeaders.OFFSET// 获取当前信息的offset

KafkaHeaders.RECEIVED_MESSAGE_KEY// 获取messagekey信息

KafkaHeaders.RECEIVED_TOPIC//获取当前信息所在topic信息

KafkaHeaders.RECEIVED_PARTITION_ID//获取当前信息所在partitionId信息

KafkaHeaders.RECEIVED_TIMESTAMP//获取当前信息的时间戳

KafkaHeaders.TIMESTAMP_TYPE//获取当前信息的时间戳类型类型

KafkaHeaders.GROUP_ID//获取当前消费组id

测试结果:

kafka对于批次消费支持:

第一步:构建consumer批次监听容器

[code]/**
*构建并行消费监听容器批次处理
*@paramconsumerFactory
*@return
*/
@Bean
publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>bitchFactory(ConsumerFactory<Integer,String>consumerFactory){

//构建kafka并行消费监听类工厂类此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<Integer,String>concurrentKafkaListenerContainerFactory=
newConcurrentKafkaListenerContainerFactory<>();

//可通过注解的方式进行设置
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
//设置拉取时间超时数
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(3000);
//是否开启批次处理
concurrentKafkaListenerContainerFactory.setBatchListener(true);

//设置ack模型机制当发生error时不同处理机制针对与offset有不同处理机制
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

returnconcurrentKafkaListenerContainerFactory;

}

/**
*注册监听bean
*@return
*/
@Bean("bitchListeners")
publicBitchListenersbitchListeners(){
returnnewBitchListeners("springboot_test_topic");
}

第二步:使用

[code]
/**
*使用SpEL表达式支持特殊的标记:__bean名称
*@authorfangyuan
*/
publicclassBitchListeners{

/**
*topic名称
*
*/
privatefinalStringtopic;

publicBitchListeners(Stringtopic){
this.topic=topic;
}

publicStringgetTopic(){
returnthis.topic;
}

/**
*批处理
*
*/
@KafkaListener(id="bitchConsumer",
topicPartitions={
//设置消费topicspringboot_test_topicpartition为0数据,1分区起始offset为0注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
@TopicPartition(topic="#{bitchListeners.topic}",partitionOffsets={
@PartitionOffset(partition="0",initialOffset="0"),
@PartitionOffset(partition="1",initialOffset="0"),
@PartitionOffset(partition="2",initialOffset="0"),
})
}
,containerFactory="bitchFactory"
//,clientIdPrefix=""
,idIsGroup=false
,concurrency="${listen.concurrency:3}"
,groupId="bitchConsumer-group"
,beanRef="bitchConsumer_"
,autoStartup="${listen.auto.start:true}"
,properties={
"max.poll.interval.ms=3000",
//设置一个批次拉取最大消息数
"max.poll.records=5"})
publicvoidlisten(List<ConsumerRecord<Integer,String>>records,Acknowledgmentack){

System.out.println("该批次拉取消息数====================>"+records.size());
//遍历消息
records.forEach(record->{
//消息消息
System.out.print("=======key=========>"+record.key());
System.out.print("=======value=========>"+record.value());
System.out.print("=======topic=========>"+record.partition());
System.out.print("=======topic=========>"+record.offset());
System.out.println();
});

//手动确定提交offset
ack.acknowledge();
}
//
///**
//*@parammessages
//*/
//@
//KafkaListener(id="bitchConsumer2"
//,topics={"__listener.topic"}
//,containerFactory="bitchFactory"
//,idIsGroup=false
//,concurrency="${listen.concurrency:3}"
//,groupId="bitchConsumer-1")
//publicvoidlistMsg(List<Message<String>>messages){
////
//}

/**
*添加批次ack确认机制
*
*@parammessages
*@paramack
*/
publicvoidlistMsgAck(List<Message<String>>messages,Acknowledgmentack){

}

/**
*@parammessages
*@paramack
*@paramconsumer
*/

publicvoidlistMsgAckConsumer(List<Message<String>>messages,Acknowledgmentack,
Consumer<Integer,String>consumer){
//
}

/**
*如果使用使用ConsumerRecord接收消息方法入参数只能为ConsumerRecord(如果启用ack机制可以包含Acknowledgment)
*@paramlist
*/
publicvoidlistCRs(List<ConsumerRecord<Integer,String>>list){
}

}

测试结果:

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38篇原创文章·获赞47·访问量1057 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: