Springboot2.0整合Kafka,从Kafka并发、批量获取数据
2019-01-31 11:51
701 查看
Springboot2.0整合Kafka,从Kafka并发、批量获取数据
Kafka安装
Kafka是由Apache软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统。
主要包含几个组件:
- Topic:消息主题,特定消息的发布接口,每个Topic都可以分成数个Partition,用于消息的并发发送。
- Producer:生产者,信息的发布者,发布者可以指定数个Partition进行发布。
- Consumer:消费者,信息的使用者,同一个Group的消费者数量,最好不好超过Partition的数量,对于分区的Topic,消费者使用时需要指定相应的分区号。
- Broker:服务代理
##下载kafka
SpringBoot整合kafka
当前SpringBoot版本为2.0.2.RELEASE,打包工具为Maven
消费者
a. 引入Pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafkatest</groupId> <artifactId>producer</artifactId> <version>1.0-SNAPSHOT</version> <name>kafka-producer</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <joda-time.version>2.3</joda-time.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
b.JAVA代码
@Service public class KafkaProducerTest { @Autowired private KafkaTemplate<String,byte[]> kafkaTemplate; private final String topic = "byteArray_topic1"; public void sendMessage(int key,String value){ ProducerRecord<String,byte[]> record = new ProducerRecord<>(topic, key%3,String.valueOf(key),value.getBytes()); kafkaTemplate.send(record); } }
配置文件(YML)
spring: kafka: producer: bootstrap-servers: 172.169.0.109:9092 batch-size: 16384 retries: 0 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
这里有一个非常陷阱的问题需要特别注意:序列化类的路径是:org.apache.kafka.common.serialization.StringSerializer
而不是
org.apache.kafka.config.serialization.StringSerializer
否则会出现如下错误:
2019-01-31 11:35:14.794 [main] WARN o.s.c.a.AnnotationConfigApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaProducerTest': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration': Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties': Could not bind properties to 'KafkaProperties' : prefix=spring.kafka, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is org.springframework.boot.context.properties.bind.BindException: Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?> 2019-01-31 11:35:14.810 [main] ERROR o.s.b.d.LoggingFailureAnalysisReporter - *************************** APPLICATION FAILED TO START *************************** Description: Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>: Property: spring.kafka.producer.key-serializer Value: org.apache.kafka.config.serialization.StringSerializer Origin: class path resource [application.yml]:8:25 Reason: No converter found capable of converting from type [java.lang.String] to type [java.lang.Class<?>] Action: Update your application's configuration
消费者
如果不使用并发获取、批量获取消费者的代码非常简单。
a.Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafkatest</groupId> <artifactId>consumer</artifactId> <version>1.0-SNAPSHOT</version> <name>kafka-consumer</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <joda-time.version>2.3</joda-time.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
b1.Java代码(无并发访问、无批量获取)
@Service @Slf4j public class Listener { private final String topic = "byteArray_topic1"; public void listen(ConsumerRecord<String, byte[]> record){ log.info("kafka的key: " + record.key()); log.info("kafka的value: " + new String(record.value())); } }
b2.配置文件
spring: kafka: consumer: enable-auto-commit: true group-id: gridMonitorGroup auto-commit-interval: 1000 auto-offset-reset: latest bootstrap-servers: "172.169.0.109:9092" key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
c.Java代码(并发、批量获取)
- Kafka消费者配置类
批量获取关键代码:
①factory.setBatchListener(true);
②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
并发获取关键代码:
factory.setConcurrency(concurrency);
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //并发数量 factory.setConcurrency(concurrency); //批量获取 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, byte[]> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //最多批量获取50个 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
- Kafka消费者Listener
@Service @Slf4j public class Listener { private final String topic = "byteArray_topic1"; @KafkaListener(id="myListener", topicPartitions ={@TopicPartition(topic = topic, partitions = { "0", "1" ,"2"})}) public void listen(List<ConsumerRecord<String, byte[]>> recordList) { recordList.forEach((record)->{ log.info("kafka的key: " + record.key()); log.info("kafka的value: " + new String(record.value())); }); } }
- 配置文件
kafka: consumer: enable-auto-commit: true group-id: gridMonitorGroup auto-commit-interval: 1000 auto-offset-reset: latest bootstrap-servers: "172.169.0.109:9092" key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer concurrency: 3
相关文章推荐
- springboot框架中使用java操作kafka获取数据
- SpringBoot2.0完美整合Mybatis并使用thymeleaf展示数据
- 从零实现 Spring Boot 2.0 整合 weixin-java-mp(weixin-java-tools) 获取 openId,用于微信授权
- centos7安装kafka单机版,并Springboot整合kafka运行
- spring boot 2.0 整合redis 完整代码
- springboot 获取多数据源
- SpringBoot整合SpringSecurity,SESSION 并发管理,同账号只允许登录一次
- springboot整合redis进行数据操作(推荐)
- spring boot 2.x 系列 —— spring boot 整合 kafka
- SpringBoot2.0学习之整合logback输出日志
- springboot2.0整合redis案例
- 【SpringBoot笔记】SpringBoot整合Druid数据连接池
- Springboot整合kafka
- SpringBoot2.0整合SpringSecurity实现WEB JWT认证
- SpringBoot整合Netty并使用Protobuf进行数据传输(附工程)
- SpringBoot 2.0 学习(六)整合Mybatis+CURD+分页
- springboot2.0整合dubbo的示例代码
- springboot整合kafka应用
- SpringBoot2.0整合ElasticSearch5.x
- spring boot / cloud (十九) 并发消费消息,如何保证入库的数据是最新的?