您的位置:首页 > 运维架构 > Linux

centos7安装kafka单机版,并Springboot整合kafka运行

2019-01-03 22:36 477 查看

不懂kafka的可以看理论学习以下再来:https://blog.csdn.net/qq_39276448/article/details/86024940

要先安装jdk

下载kafka

cd ~ 切到当前用户home目录

wget http://mirrors.shuosc.org/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

解压

tar -zxvf kafka_2.11-2.1.0.tgz

并且创建 在kafka_2.11-2.1.0下创建logs文件夹

创建kafka文件夹(我喜欢放在/usr/local)

cd /usr/local

mkdir kafka

cd ~ 切到当前用户home目录

裁切kafka_2.11-2.1.0 到/usr/local/kafka
mv kafka_2.11-2.1.0 /usr/local/kafka

修改配置文件

vim /usr/local/kafka/kafka_2.11-2.1.0/config/server.properties

外网访问问题配置

Cannot assign requested这个问题也是这样配置

不要去学网上的配置port,和hostname;如果没配置port,hostname就会使用以下配置。配置了则使用hostname,port,字数多难得写所以楼主选了以下配置

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://你自己的外网地址:9092
         

设置broker节点id

设置日志文件地址

broker.id=1
log.dir=/usr/local/kafka/kafka_2.11-2.1.0/logs

这里的logs文件夹要自己手动创建

切换到bin目录下

启动zook单机节点  -daemon(不显示输出)

sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

启动kafka

sh kafka-server-start.sh config/server.properties & 后台启动(&)

建立topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看kafka内所有topic

sh kafka-topics.sh --list --zookeeper localhost:2181

如果看到test说明成功!

其他测试请自便

springboot2整合kafka

依赖包

[code]<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>

</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
[code]<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

yml

[code]server:
port: 11000
spring:
kafka:
producer:
batch-size: 16384
retries: 0
buffer-memory: 33554432
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 你的服务器外网地址:9092
consumer:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group
auto-offset-reset: earliest
auto-commit-interval: 100
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

编写接口

[code]package com.bj.kafka.Kafka;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

@RestController
public class KafkaController {

private Gson gson = new GsonBuilder().create();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("/send")
public String send(String name) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(name);
message.setSendTime(new Date());
System.out.println("+++++++++++++++++++++  message = {}"+ gson.toJson(message));
//这里发送到那个消息topic(xiaobaii),并设置发送的数据
kafkaTemplate.send("xiaobaii", gson.toJson(message));
return name;
}

}

编写message类

[code]package com.bj.kafka.Kafka;

import lombok.Data;

import java.util.Date;

@Data
public class Message {
private Long id;    //id

private String msg; //消息

private Date sendTime;  //时间戳

}

编写消费端

[code]package com.bj.kafka.Kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class MyConsumer {
//这里便是监听了xiaobaii,只要程序不停止就会持续消费!
@KafkaListener(topics = "xiaobaii")
public void listen(ConsumerRecord<?,String> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

System.out.println(message);
}
}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: