SpringBoot整合Kafka:简单收发消息案例
2017-12-18 11:56
1021 查看
环境说明
Windows 10 1709IDEA 2017.3.2
SpringBoot 2.0.M7
Spring-Kafka 2.1.0.RELEASE
JDK 1.8.0_144
Maven 3.5.0
阿里云ECS
CentOS 7
Kafka 2.12-1.0.0
zookeeper 3.4.10
下载并解压Kafka
下载tgz包 wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 解压 tar -zxvf kafka_2.12-1.0.0.tgz 进入解压后的文件夹 cd kafka_2.12-1.0.0.tgz
启动Kafka
Kafka使用ZooKeeper,所以需要先启动一个ZooKeeper服务器,如果你的机器上还没有。你可以使用随Kafka一起打包的便捷脚本来获取一个快速但是比较粗糙的单节点ZooKeeper实例。bin/zookeeper-server-start.sh config/zookeeper.properties
配置kafka
Kafka在config目录下提供了一个基本的配置文件。为了保证可以远程访问Kafka,我们需要修改两处配置。打开config/server.properties文件,在很靠前的位置有listeners和 advertised.listeners两处配置的注释,去掉这两个注释,并且根据当前服务器的IP修改如下:
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 delete.topic.enable=true host.name=172.17.7.97 advertised.host.name=47.94.106.42 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://阿里云内网ip:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://阿里云外网ip:9092
启动kafka
bin/kafka-server-start.sh config/server.properties
可能会报错,因为我的阿里云内存是1g,所以内存不足啦.不用怀疑,kafka默认需要1g内存.
Java Hotspot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
将 kafka-server-start.sh的
export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
修改为
export KAFKA_HEAP_OPTS=”-Xmx256M -Xms128M”
再次启动,成功启动.
创建SpringBoot项目
IDEA创建SpringBoot项目可以看这篇博客:SpringBoot初体验:久闻大名,请多指教!(简单web项目+MockMvc单元测试)
在创建springboot的时候,记得选择版本为2.0.M7
pom.xml
在pom.xml中加入下面的依赖<!-- springBoot集成kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency>
application.yml
在resource中新建file,命名为application.yml# kafka spring: kafka: # kafka服务器地址(可以多个) bootstrap-servers: 阿里云外网ip:9092 consumer: # 指定一个默认的组名 group-id: kafka2 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 # 服务器地址 bootstrap-servers: 阿里云外网ip:9092
KafkaController.java
在默认包下,新建一个KafkaController.javapackage xin.csqsx.kafka3; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 包名 xin.csqsx.restful * 类名 KafkaController * 类描述 springBoot整合kafka * * @author dell * @version 1.0 * 创建日期 2017/12/15 * 时间 11:55 */ @RestController @EnableAutoConfiguration public class KafkaController { /** * 注入kafkaTemplate */ @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 发送消息的方法 * * @param key 推送数据的key * @param data 推送数据的data */ private void send(String key, String data) { kafkaTemplate.send("test", key, data); } @RequestMapping("/kafka") public String testKafka() { int iMax = 6; for (int i = 1; i < iMax; i++) { send("key" + i, "data" + i); } return "success"; } public static void main(String[] args) { SpringApplication.run(KafkaController.class, args); } /** * 使用日志打印消息 */ private static Logger logger = LoggerFactory.getLogger(KafkaController.class); @KafkaListener(topics = "test") public void receive(ConsumerRecord<?, ?> consumer) { logger.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value()); } }
OK,万事大吉啦.启动后,访问http://localhost:8080/kafka,就可以看到控制台打印消息了
2017-12-18 10:40:19.877 INFO 10084 --- [nio-8080-exec-8] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 2017-12-18 10:40:19.877 INFO 10084 --- [nio-8080-exec-8] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d 2017-12-18 10:40:20.164 INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController : test - key1:data1 2017-12-18 10:40:20.164 INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController : test - key2:data2 2017-12-18 10:40:20.164 INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController : test - key3:data3 2017-12-18 10:40:20.164 INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController : test - key4:data4 2017-12-18 10:40:20.164 INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController : test - key5:data5
本篇博客参考许多大神的博客,在此谢过.
参考博客地址:
Apache Kafka 入门 - 基本配置和运行
Apache Kafka 入门 - Spring Boot 集成 Kafka
spring boot + kafka 使用详细步骤
kafka 启动 报错cannot allocate memory,即内存不足
2017/12/18
Lucifer*
相关文章推荐
- Spring Boot整合Kafka的简单用例(@KafkaListener注解)
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅
- SpringBoot 整合 Redis 的简单案例
- springboot整合kafka(window单机版kafka简单向)
- Spring Boot教程(十四)Spring Boot整合ActiveQ实现消息收发和订阅
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅
- spring boot整合spring-kafka实现发送接收消息实例代码
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- Spring Boot 构建应用——整合消息中间件 Kafka
- SpringBoot 整合 Redis 的简单案例
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅
- Spring Boot整合Kafka的简单用例
- Spring Boot整合ActiveQ实现消息收发和订阅
- springboot整合kafka应用
- SpringBoot整合Rabbitmq设置消息请求头
- spring boot 整合 freemark(简单结构)
- Spring Boot 整合 Mybatis Annotation 注解的完整 Web 案例
- kafka分布式消息队列使用(springboot和springmvc)
- 干货--JMS(java消息服务)整合Spring项目案例