您的位置:首页 > 编程语言 > Java开发

springboot 集成kafka 简例

2017-04-20 15:07 351 查看
注意:请确保本地kafka可运行

如果没配置可以参考另外一篇文章:http://blog.csdn.net/u010054969/article/details/70241478

1.相关依赖

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>


2. 配置application.properties

# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=test
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer


3. 消息生产者程序

@Controller
public class testController {
@Autowired
KafkaTemplate kafkaTemplate;

@RequestMapping("/query/queryAllItem")
public String queryAllItem(Model model){
kafkaTemplate.send("test01","bootc", "bootcwnao");
return "item/itemList";
}
}


4 编写consumer监听程序

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;

@Component
public class testConsumerHandler {
@KafkaListener(topics = {"test01"})
public void processMessage(ConsumerRecord<?, ?> record) {
System.out.println(record.toString());
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("this is the testTopic send message:" + message);
}
}
}


这个Demo也挺不错的:http://blog.csdn.net/u010207995/article/details/67641321
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring-boot