您的位置:首页 > 编程语言 > Java开发

spring boot 集成kafka

2019-05-06 15:31 162 查看
版权声明:诸葛子房 https://blog.csdn.net/weixin_43291055/article/details/92616895

spring boot 集成kafka

1、先解决依赖

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>

 这里我们先把配置文件展示一下

#============== kafka producer===================
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.properties.max.requst.size=2097152
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=0
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
#=======set comsumer max fetch.byte 2*1024*1024=============
spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152

 

2、Kafka producer 

@Order(value = 1)
@Component
@Slf4j
public class Producer implements CommandLineRunner {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Override
public void run(String... strings) throws Exception {
while (true) {
log.info("kafka的消息");
kafkaTemplate.send("test", "zhugezifang");
log.info("发送kafka成功.");
Thread.sleep(5000);
}

}
}

 

3、kafka consumer

@Component
@Slf4j
public class Consumer {

@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record) {
log.info("topic:{}, offset:{}, value:{}", record.topic(), record.offset(), record.value());
}
}

 

posted @ 2019-05-06 15:31 诸葛子房 阅读(...) 评论(...) 编辑 收藏
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: