您的位置:首页 > 其它

Kafka 开发实战

2022-05-16 20:39 477 查看

一、消息的发送和接收

⽣产者主要的对象有:

KafkaProducer
ProducerRecord

其中

KafkaProducer
是⽤于发送消息的类,
ProducerRecord
类⽤于封装 Kafka 的消息。

KafkaProducer
的创建需要指定的参数和含义:

参数 说明
bootstrap.servers 配置⽣产者如何与broker建⽴连接。该参数设置的是初始化参数。如果⽣产者需要连接的是Kafka集群,则这⾥配置集群中⼏个broker的地址,⽽不是全部,当⽣产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
key.serializer 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。
value.serializer 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。
acks 默认值:all。
acks=0:
⽣产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
该情形不能保证broker是否真的收到了消息,retries配置也不会⽣效。发送的消息的返回的消息偏移量永远是-1。

acks=1
表示消息只需要写到主分区即可,然后就响应客户端,⽽不等待副本分区的确认。
在该情形下,如果主分区收到消息确认之后就宕机了,⽽副本分区还没来得及同步该消息,则该消息丢失。

acks=all
⾸领分区会等待所有的ISR副本分区确认记录。
该处理保证了只要有⼀个ISR副本分区存活,消息就不会丢失。
这是Kafka最强的可靠性保证,等效于acks=-1
retries retries重试次数
当消息发送出现错误的时候,系统会重发消息。
跟客户端收到错误时重发⼀样。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了

其他参数可以从

org.apache.kafka.clients.producer.ProducerConfig
中找到。我们后⾯的内容会介绍到。

消费者⽣产消息后,需要broker端的确认,可以同步确认,也可以异步确认。

同步确认效率低,异步确认效率⾼,但是需要设置回调对象。

添加Maven依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!--高版本兼容低版本-->
<version>1.0.2</version>
</dependency>

生产者

这里我使用本地虚拟机,我本地虚拟机的IP是

192.168.0.102

同步等待消息确认:

public class MyProducer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接⽤到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put("bootstrap.servers", "192.168.0.102:9092");
// 设置key的序列化器
configs.put("key.serializer", IntegerSerializer.class);
// 设置value的序列化器
configs.put("value.serializer", StringSerializer.class);
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// ⽤于封装Producer的消息
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1", // 主题名称
0, // 分区编号,现在只有⼀个分区,所以是0
0, // 数字作为key
"message 0" // 字符串作为value
);
// 发送消息,同步等待消息的确认
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(3000, TimeUnit.MILLISECONDS);

System.out.println("主题:" + metadata.topic()
+ "\n分区:" + metadata.partition()
+ "\n偏移量:" + metadata.offset()
+ "\n序列化的key字节:" + metadata.serializedKeySize()
+ "\n序列化的value字节:" + metadata.serializedValueSize()
+ "\n时间戳:" + metadata.timestamp());
// 关闭⽣产者
producer.close();
}
}

异步等待消息确认:

public class MyProducer2 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "192.168.0.102:9092");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1", 0, 1, "message 2");
// 使⽤回调异步等待消息的确认
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("主题:" + metadata.topic()
+ "\n分区:" + metadata.partition()
+ "\n偏移量:" + metadata.offset()
+ "\n序列化的key字节:" + metadata.serializedKeySize()
+ "\n序列化的value字节:" + metadata.serializedValueSize()
+ "\n时间戳:" + metadata.timestamp());
} else {
System.out.println("有异常:" + exception.getMessage());
}
}
});
// 关闭连接
producer.close();
}
}

消费者

public class MyConsumer1 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
configs.put("bootstrap.servers", "192.168.0.102:9092");
// key的反序列化器
configs.put("key.deserializer", IntegerDeserializer.class);
// value的反序列化器
configs.put("value.deserializer", StringDeserializer.class);
// 设置消费组
configs.put("group.id", "consumer.demo");
// 创建消费者对象
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
// 可以使用正则表达式批量订阅主题
// final Pattern pattern = Pattern.compile("topic_\\d")
final Pattern pattern = Pattern.compile("topic_[0-9]");
final List<String> topics = Arrays.asList("topic_1");
// 消费者订阅主题或分区
// consumer.subscribe(pattern);
// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println("剥夺的分区:" + tp.partition());
});
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println(tp.partition());
});
}
});
// 拉取订阅主题的消息
final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
// 获取topic_1主题的消息
final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
// 遍历topic_1主题的消息
topic1Iterable.forEach(record -> {
System.out.println("========================================");
System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的偏移量:" + record.offset());
System.out.println("消息的分区号:" + record.partition());
System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
System.out.println("消息的时间戳:" + record.timestamp());
System.out.println("消息的时间戳类型:" + record.timestampType());
System.out.println("消息的主题:" + record.topic());
System.out.println("消息的值:" + record.value());
});
// 关闭消费者
consumer.close();
}
}

二、Spring Boot Kafka

pom.xml 依赖

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

application.properties

spring.application.name=demo-02-producer-consumer
server.port=8080
# ⽤于建⽴初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.0.102:9092
# producer⽤到的key和value的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer⽤到的key和value的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-02-consumer
# 是否⾃动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交⼀次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则⾃动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest

Application.java 启动类

@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

KafkaConfig.java 配置类,可以在应用启动时创建Topic,这里可以不用

@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("ntp-01", 5, (short) 1);
}

@Bean
public NewTopic topic2() {
return new NewTopic("ntp-02", 3, (short) 1);
}
}

生产者 KafkaSyncProducerController.java

@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;

// 同步等待消息发送
@GetMapping("/sendSync/{message}")
public String sendSync(@PathVariable String message) throws ExecutionException, InterruptedException {
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"spring-topic-01", 0, 1, message
);
ListenableFuture future = template.send(record);
// 同步等待broker的响应
Object o = future.get();
SendResult<Integer, String> result = (SendResult<Integer, String>) o;
System.out.println(result.getRecordMetadata().topic()
+ result.getRecordMetadata().partition()
+ result.getRecordMetadata().offset());

return "success";
}

// 异步等待消息确认
@GetMapping("/sendAsync/{message}")
public String sendAsync(@PathVariable String message) throws ExecutionException, InterruptedException {
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"spring-topic-01", 0, 1, message
);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
// 异步等待broker的响应
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送失败: " + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t"
+ result.getRecordMetadata().partition() + "\t"
+ result.getRecordMetadata().offset());
}
});

return "success";
}
}

消费者MyConsumer.java

@Component
public class MyConsumer {
@KafkaListener(topics = "spring-topic-01")
public void onMessage(ConsumerRecord<Integer, String> record) {
Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: