您的位置:首页 > 编程语言 > Java开发

springboot集成kafka

2017-09-28 11:09 204 查看
搭建kafka服务器,centos7
1,先安装Java并设置好环境变量
2,下载kafka安装包并解压,这里用的是kafka_2.10-0.10.2.0.tgz

这是下载地址http://download.csdn.net/download/wsbgmofo/9999340

mkdir kaffa
tar -zxvf kafka_2.10-0.10.2.0.tgz
cd kafka_2.10-0.10.2.0
3,因为kafka依赖于zookeeper,所以先启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
4,启动kafka
bin/kafka-server-start.sh config/server.properties
5,创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181 myTopic
6,创建一个生产者(为了测试)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
7,创建一个消费者接收消息并在终端打印(为了测试)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic myTopic --from-beginning
 
如果消费者能正常接收消息说明kafka服务器已经搭建成功,下面是springboot集成kafka实例
 
springboot集成kafka:
1,在springboot官网(https://start.spring.io/)上生成一个springboot项目,添加springboot-kafka组件
或者直接生成一个maven项目,在pom.xml中导入对应的jar包,如下所示:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2,在application.properties配置文件中输入以下内容:
这是最简单的一个配置,只需要定义kafka服务器地址端口以及随意定义一个消费用户组
spring.kafka.bootstrap-servers=192.168.230.10:9092
spring.kafka.consumer.group-id=myGroup
下面这4条配置是对应的编码配置直接复制粘贴无需修改
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
3,新建一个生产者实体类,代码如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
@Component
public class MyProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(){
//myTopic是先前新建的topic名称,myGroup是设定的一个消费消息的用户组,可随意设置,Hello My Kafka ...消息内容
kafkaTemplate.send("myTopic", "myGroup", "Hello My Kafka ...");
}
}
4,新建一个消费者实体类,如下所示:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class MyConsumer {
//KafkaListener这个注解标识要监听的topic,有消息发送至myTopic这里就会监听到,会收到消息
@KafkaListener(topics = "myTopic")
public void receiveMessage(String content){
System.out.println("content:" + content);
}
}
5,启动测试:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
 
@SpringBootApplication
public class SpringKafkaApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(SpringKafkaApplication.class, args);
MyProducer myProducer = app.getBean(MyProducer.class);
myProducer.sendMessage();
}
}
6,你会发现启动后会报错,首先要确定是否能连上kafka服务器
如果你确定kafka服务器能被外网访问到,那就进入到kafka的config目录下
编辑server.properties文件,在文件最下面添加如下内容:
//这里添加的是你kafka的服务地址
advertised.host.name=192.168.230.10
保存并退出,重启kafka服务,再启动测试就没问题了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  springboot kafka