您的位置:首页 > 编程语言 > Java开发

Springboot2.0整合Kafka,从Kafka并发、批量获取数据

2019-01-31 11:51 701 查看

Springboot2.0整合Kafka,从Kafka并发、批量获取数据

  • 消费者
  • Kafka安装

    Kafka是由Apache软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统。
    主要包含几个组件:

    • Topic:消息主题,特定消息的发布接口,每个Topic都可以分成数个Partition,用于消息的并发发送。
    • Producer:生产者,信息的发布者,发布者可以指定数个Partition进行发布。
    • Consumer:消费者,信息的使用者,同一个Group的消费者数量,最好不好超过Partition的数量,对于分区的Topic,消费者使用时需要指定相应的分区号。
    • Broker:服务代理
      ##下载kafka

    SpringBoot整合kafka

    当前SpringBoot版本为2.0.2.RELEASE,打包工具为Maven

    消费者

    a. 引入Pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.kafkatest</groupId>
    <artifactId>producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-producer</name>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.2.RELEASE</version>
    <relativePath/>
    </parent>
    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <joda-time.version>2.3</joda-time.version>
    </properties>
    
    <dependencies>
    
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    </dependencies>
    
    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>
    </project>

    b.JAVA代码

    @Service
    public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String,byte[]> kafkaTemplate;
    private final String topic = "byteArray_topic1";
    
    public void sendMessage(int key,String value){
    ProducerRecord<String,byte[]> record = new ProducerRecord<>(topic,
    key%3,String.valueOf(key),value.getBytes());
    kafkaTemplate.send(record);
    }
    }

    配置文件(YML)

    spring:
    kafka:
    producer:
    bootstrap-servers: 172.169.0.109:9092
    batch-size: 16384
    retries: 0
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

    这里有一个非常陷阱的问题需要特别注意:序列化类的路径是:org.apache.kafka.common.serialization.StringSerializer
    而不是
    org.apache.kafka.config.serialization.StringSerializer
    否则会出现如下错误:

    2019-01-31 11:35:14.794 [main] WARN  o.s.c.a.AnnotationConfigApplicationContext -
    Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaProducerTest': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration': Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties': Could not bind properties to 'KafkaProperties' : prefix=spring.kafka, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is org.springframework.boot.context.properties.bind.BindException: Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>
    2019-01-31 11:35:14.810 [main] ERROR o.s.b.d.LoggingFailureAnalysisReporter -
    
    ***************************
    APPLICATION FAILED TO START
    ***************************
    
    Description:
    
    Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>:
    
    Property: spring.kafka.producer.key-serializer
    Value: org.apache.kafka.config.serialization.StringSerializer
    Origin: class path resource [application.yml]:8:25
    Reason: No converter found capable of converting from type [java.lang.String] to type [java.lang.Class<?>]
    
    Action:
    
    Update your application's configuration

    消费者

    如果不使用并发获取、批量获取消费者的代码非常简单。

    a.Pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.kafkatest</groupId>
    <artifactId>consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-consumer</name>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <joda-time.version>2.3</joda-time.version>
    </properties>
    
    <dependencies>
    
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    </dependencies>
    
    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>
    </project>

    b1.Java代码(无并发访问、无批量获取)

    @Service
    @Slf4j
    public class Listener {
    private final String topic = "byteArray_topic1";
    
    public void listen(ConsumerRecord<String, byte[]> record){
    log.info("kafka的key: " + record.key());
    log.info("kafka的value: " + new String(record.value()));
    }
    }

    b2.配置文件

    spring:
    kafka:
    consumer:
    enable-auto-commit: true
    group-id: gridMonitorGroup
    auto-commit-interval: 1000
    auto-offset-reset: latest
    bootstrap-servers: "172.169.0.109:9092"
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

    c.Java代码(并发、批量获取)

    1. Kafka消费者配置类
      批量获取关键代码:
      ①factory.setBatchListener(true);
      ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
      并发获取关键代码:
      factory.setConcurrency(concurrency);
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    @Value("${kafka.consumer.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //并发数量
    factory.setConcurrency(concurrency);
    //批量获取
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
    }
    
    public ConsumerFactory<String, byte[]> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    //最多批量获取50个
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
    return propsMap;
    }
    
    @Bean
    public Listener listener() {
    return new Listener();
    }
    }
    1. Kafka消费者Listener
    @Service
    @Slf4j
    public class Listener {
    private final String topic = "byteArray_topic1";
    
    @KafkaListener(id="myListener",
    topicPartitions ={@TopicPartition(topic = topic, partitions = { "0", "1" ,"2"})})
    public void listen(List<ConsumerRecord<String, byte[]>> recordList) {
    recordList.forEach((record)->{
    log.info("kafka的key: " + record.key());
    log.info("kafka的value: " + new String(record.value()));
    });
    }
    }
    1. 配置文件
    kafka:
    consumer:
    enable-auto-commit: true
    group-id: gridMonitorGroup
    auto-commit-interval: 1000
    auto-offset-reset: latest
    bootstrap-servers: "172.169.0.109:9092"
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
    concurrency: 3
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: