【HAVENT原创】Spring Boot + Spring-Kafka 异步配置
2018-08-15 17:34
826 查看
近日我们项目组采用 Kafka 来做系统日志统一管理,但是天降横祸的让 Kafka 集群(3台服务器)都挂了,堪比中大奖的节奏,随之而来的是使用 Kafka 发送消息日志的服务全部卡死,经过排查发现居然是 Kafka 当机导致了调用 Kafka 发送日志服务一直处于阻塞状态。
最后我们在检查代码的时候发现,如果无法连接 Kafka 服务,则会出现一分钟的阻塞。以上问题有两种解决方案:
一、开启异步模式 ( @EnableAsync )
二、如果使用同步模式,可以通过修改配置参数 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默认 60s ) 来缩短阻塞时间
谨以此献给那些被 Spring Kafka 同步模式坑害又苦无出路的同胞。。。
最后我们在检查代码的时候发现,如果无法连接 Kafka 服务,则会出现一分钟的阻塞。以上问题有两种解决方案:
一、开启异步模式 ( @EnableAsync )
@EnableAsync @Configuration public class KafkaProducerConfig { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); @Value("${kafka.brokers}") private String servers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return props; } @Bean public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); } @Bean public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); } @Bean public Producer producer() { return new Producer(); } }
public class Producer { public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); @Autowired private KafkaTemplate<String, GenericMessage> kafkaTemplate; @Async public void send(String topic, GenericMessage message) { ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { @Override public void onSuccess(final SendResult<String, GenericMessage> message) { LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); } @Override public void onFailure(final Throwable throwable) { LOGGER.error("unable to send message= " + message, throwable); } }); } }
二、如果使用同步模式,可以通过修改配置参数 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默认 60s ) 来缩短阻塞时间
package com.havent.demo.logger.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.scheduling.annotation.EnableAsync; import java.util.HashMap; import java.util.Map; @EnableAsync @Configuration @EnableKafka public class KafkaConfiguration { @Value("${spring.kafka.producer.bootstrap-servers}") private String serverAddress; public Map<String, Object> producerConfigs() { System.out.println("HH > serverAddress: " + serverAddress); Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性 props.put(ProducerConfig.RETRIES_CONFIG, 0); // Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); /** * 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段, * 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区, * 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 * linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 2000); /** * 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。 * 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。 */ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒 // HH 警告:如无法连接 kafka 会导致程序卡住,尽量不要设置等待太久 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100); // 消息发送的最长等待时间 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100); // 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP // 1:发送消息,并会等待leader 收到确认后,一定的可靠性 // -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性 props.put(ProducerConfig.ACKS_CONFIG, "0"); System.out.println(props); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
谨以此献给那些被 Spring Kafka 同步模式坑害又苦无出路的同胞。。。
相关文章推荐
- 【HAVENT原创】Spring Boot + Kafka 消息日志开发
- springboot配置kafka与原生kafka配置
- spring boot使用自定义配置的线程池执行Async异步任务
- 【HAVENT原创】使用 Spring Boot 的 AOP 自定义注解
- 【HAVENT原创】使用 Spring Boot 的 AOP 拦截器终止方法执行
- Kafka 安装-配置-监控 与集成springboot
- springboot中使用定时任务,异步调用,自定义配置参数(八)
- spring boot 1.5.2自动配置kafka
- spring-boot 方法异步调用,自定义线程池配置使用
- springboot 定时任务(线程配置,并行【同步】、异步等)
- spring boot使用自定义配置的线程池执行Async异步任务
- 【HAVENT原创】使用 Spring Boot 的 AOP 全局记录执行时间日志
- Spring Boot 配置文件 – 在坑中实践
- SpringBoot配置优先级
- SpringBoot配置文件以及配置项
- SpringBoot系列——Java配置(SpringMVC配置)
- springboot的一些配置
- Springboot 之 解决IDEA读取properties配置文件的中文乱码问题
- spring事件机制——异步配置
- Springboot 之 解决IDEA读取properties配置文件的中文乱码问题