Spring boot 整合kafka实现消费者customer批量获取信息
2020-07-13 06:00
197 查看
这篇博客能帮助你快速的使用kafka,但是并不能让你全面的了解和熟悉kafka,–人生是一场修行
1.与其他消息中间件类似,有三个最基本的概念:生产者,消费者,以及topic(我习惯理解为队列),生产者发送消息到topic中,topic可以存放很多消息是实体类,消费者监听相应的topic并按照规则拉取信息,
2.pom.xml中导入依赖包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
3.编写生产者
package com.lnbdy.sms.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafKaCustomrProducer { @Autowired private KafkaTemplate kafkaTemplate; //控制层或者业务成调用此方法传入对应的topic,Object是消息试题类 public void sendMessage(String topic, Object object) { /* * 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于 * kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于 * ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型 */ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> sendResult) { // System.out.println("发送结果:" + sendResult.toString()); } }); } }
4.编写消费者配置类:
package com.lnbdy.sms.kafka; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import lombok.Data; import lombok.extern.slf4j.Slf4j; @Configuration @Slf4j @Data public class KafkaCustomerConfig { @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); // 开启批量监听 return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "171.16.68.155:9092"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); //设置每次接收Message的数量 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);//每次接收Message的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
5.编写消费者:
package com.lnbdy.sms.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.lnbdy.sms.api.entity.KafkaUmengMessageParam; import com.lnbdy.sms.api.entity.Message; import com.lnbdy.sms.api.entity.UmengParam; import com.lnbdy.sms.utils.MessagePushUtil; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaSimpleConsumer { @KafkaListener(groupId = "group", topics = Topic.BATCH, containerFactory = "batchFactory") public void consumer(List<ConsumerRecord<String,String>> list) { for (ConsumerRecord<String,String> record : list) { System.err.println(record.value()); //编写适合逻辑代码 } } }
相关文章推荐
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- springboot kafka消费者获取不到信息
- Springboot2.0整合Kafka,从Kafka并发、批量获取数据
- 【实战】SpringBoot + KafKa实现生产者和消费者功能
- 【个人学习】使用idea搭建SpringBoot,整合Mybatis、Thymeleaf,连接数据库,实现具有前端界面项目:主要功能登录,注册,个人信息查看、更改,不定时更新中...
- 从零实现 Spring Boot 2.0 整合 weixin-java-mp(weixin-java-tools) 获取 openId,用于微信授权
- SpringBoot整合SpringKafka实现生产者史上最简代码实现
- Kafka和SpringBoot整合实现消息发送与消费
- 分布式消息系统:Kafka(九)应用Spring Boot实现消费者和生产者
- Kafka和SpringBoot整合实现消息发送与消费
- SpringBoot2.0(九):SpringBoot2.0 实现微信授权登录并且获取用户信息
- Springboot整合Kafka实现收发消息
- spring boot整合spring-kafka实现发送接收消息实例代码
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- SpringBoot整合ActiveMQ:实现生产者-消费者
- springboot 集成kafka 实现多个customer不同group
- springboot +vue实现token登录3之获取登录人员信息
- Springboot+Shiro记录用户登录信息并获取当前登录用户信息的实现代码
- spring boot整合JMS(ActiveMQ实现)
- Spring Boot 整合 Elasticsearch,实现 function score query 权重分查询