centos7安装kafka单机版,并Springboot整合kafka运行
不懂kafka的可以看理论学习以下再来:https://blog.csdn.net/qq_39276448/article/details/86024940
要先安装jdk
下载kafka
cd ~ 切到当前用户home目录
wget http://mirrors.shuosc.org/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
解压
tar -zxvf kafka_2.11-2.1.0.tgz
并且创建 在kafka_2.11-2.1.0下创建logs文件夹
创建kafka文件夹(我喜欢放在/usr/local)
cd /usr/local
mkdir kafka
cd ~ 切到当前用户home目录
裁切kafka_2.11-2.1.0 到/usr/local/kafka
mv kafka_2.11-2.1.0 /usr/local/kafka
修改配置文件
vim /usr/local/kafka/kafka_2.11-2.1.0/config/server.properties
外网访问问题配置
Cannot assign requested这个问题也是这样配置
不要去学网上的配置port,和hostname;如果没配置port,hostname就会使用以下配置。配置了则使用hostname,port,字数多难得写所以楼主选了以下配置
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://你自己的外网地址:9092
设置broker节点id
设置日志文件地址
broker.id=1
log.dir=/usr/local/kafka/kafka_2.11-2.1.0/logs
这里的logs文件夹要自己手动创建
切换到bin目录下
启动zook单机节点 -daemon(不显示输出)
sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
启动kafka
sh kafka-server-start.sh config/server.properties & 后台启动(&)
建立topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看kafka内所有topic
sh kafka-topics.sh --list --zookeeper localhost:2181
如果看到test说明成功!
其他测试请自便
springboot2整合kafka
依赖包
[code]<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency>
[code]<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
yml
[code]server: port: 11000 spring: kafka: producer: batch-size: 16384 retries: 0 buffer-memory: 33554432 value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 你的服务器外网地址:9092 consumer: value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-consumer-group auto-offset-reset: earliest auto-commit-interval: 100 enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
编写接口
[code]package com.bj.kafka.Kafka; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.UUID; @RestController public class KafkaController { private Gson gson = new GsonBuilder().create(); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public String send(String name) { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(name); message.setSendTime(new Date()); System.out.println("+++++++++++++++++++++ message = {}"+ gson.toJson(message)); //这里发送到那个消息topic(xiaobaii),并设置发送的数据 kafkaTemplate.send("xiaobaii", gson.toJson(message)); return name; } }
编写message类
[code]package com.bj.kafka.Kafka; import lombok.Data; import java.util.Date; @Data public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 }
编写消费端
[code]package com.bj.kafka.Kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Component public class MyConsumer { //这里便是监听了xiaobaii,只要程序不停止就会持续消费! @KafkaListener(topics = "xiaobaii") public void listen(ConsumerRecord<?,String> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println(message); } } }
- centos7安装kafka(伪)集群版,并Springboot整合kafka运行
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- springboot整合kafka(window单机版kafka简单向)
- springboot+shiro+redis(单机redis版)整合教程-续(添加动态角色权限控制)
- Spring Boot整合Dubbo运行
- Spring Boot整合Kafka的简单用例
- SpringBoot整合Kafka和Storm
- 快速搭建springboot框架以及整合ssm+shiro+安装Rabbitmq和Erlang、Mysql下载与配置
- Spring Boot 构建应用——整合消息中间件 Kafka
- springboot整合kafka应用
- spring boot整合spring-kafka实现发送接收消息实例代码
- 被版本更新坑到哭系列:SpringBoot整合Kafka
- SpringBoot整合Kafka:简单收发消息案例
- SpringBoot(十一):SpringBoot整合Kafka
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- springboot整合kafka
- spring boot整合redis(单机+集群)
- SpringBoot整合SpringKafka实现生产者史上最简代码实现
- spring-boot 整合mybatis 配置 可以发布到tomcat中运行
- spring boot 整合kafka