您的位置:首页 > 编程语言 > Java开发

Spring boot 整合kafka实现消费者customer批量获取信息

2020-07-13 06:00 197 查看

这篇博客能帮助你快速的使用kafka,但是并不能让你全面的了解和熟悉kafka,–人生是一场修行

1.与其他消息中间件类似,有三个最基本的概念:生产者,消费者,以及topic(我习惯理解为队列),生产者发送消息到topic中,topic可以存放很多消息是实体类,消费者监听相应的topic并按照规则拉取信息,

2.pom.xml中导入依赖包

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

3.编写生产者

package com.lnbdy.sms.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafKaCustomrProducer {

@Autowired
private KafkaTemplate kafkaTemplate;
//控制层或者业务成调用此方法传入对应的topic,Object是消息试题类
public void sendMessage(String topic, Object object) {

/*
* 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于
* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于
* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型
*/
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("发送消息失败:" + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> sendResult) {
//				System.out.println("发送结果:" + sendResult.toString());
}
});
}
}

4.编写消费者配置类:

package com.lnbdy.sms.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Configuration
@Slf4j
@Data
public class KafkaCustomerConfig {

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 开启批量监听
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "171.16.68.155:9092");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); //设置每次接收Message的数量
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);//每次接收Message的时间间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

}

5.编写消费者:

package com.lnbdy.sms.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.lnbdy.sms.api.entity.KafkaUmengMessageParam;
import com.lnbdy.sms.api.entity.Message;
import com.lnbdy.sms.api.entity.UmengParam;
import com.lnbdy.sms.utils.MessagePushUtil;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaSimpleConsumer {

@KafkaListener(groupId = "group", topics = Topic.BATCH, containerFactory = "batchFactory")
public void consumer(List<ConsumerRecord<String,String>> list) {

for (ConsumerRecord<String,String> record : list) {
System.err.println(record.value());
//编写适合逻辑代码
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐