Spring Boot整合Kafka的简单用例(@KafkaListener注解)
2018-03-08 20:17
681 查看
第一步、启动zookeeper server和kafka server
启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave
第二步、创建一个maven项目
这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.0.0.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.4.RELEASE</version> </dependency>
第三步、kafka配置
@Configuration @EnableKafka public class KafkaConfig { /* --------------producer configuration-----------------**/ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /* --------------consumer configuration-----------------**/ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean//消息监听器 public MyListener myListener() { return new MyListener(); } /* --------------kafka template configuration-----------------**/ @Bean public KafkaTemplate<String,String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
第四步、topic的配置
自动创建的topic分区数是1,复制因子是0@Configuration @EnableKafka public class TopicConfig { @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); return new KafkaAdmin(configs); } @Bean public NewTopic foo() { /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数 //当broker个数为1个时会创建topic失败, //提示:replication factor: 2 larger than available brokers: 1 //只有在集群中才能使用kafka的备份功能 return new NewTopic("foo", 10, (short) 2); } @Bean public NewTopic bar() { return new NewTopic("bar", 10, (short) 2); } @Bean public NewTopic topic1(){ return new NewTopic("topic1", 10, (short) 2); } @Bean public NewTopic topic2(){ return new NewTopic("topic2", 10, (short) 2); } }
第五步、使用@KafkaListener注解
topicPartitions和topics、topicPattern不能同时使用public class MyListener { @KafkaListener(id = "myContainer1",//id是消费者监听容器 topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息, //topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5 { @TopicPartition(topic = "topic1", partitions = { "0", "3" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")) }) public void listen(ConsumerRecord<?, ?> record) { System.out.println("topic" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } @KafkaListener(id = "myContainer2",topics = {"foo","bar"}) public void listen2(ConsumerRecord<?, ?> record){ System.out.println("topic:" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } }
第六步、创建发送消息的接口
@RestController public class KafkaController { private final static Logger logger = LoggerFactory.getLogger(KafkaController.class); @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping(value = "/{topic}/send",method = RequestMethod.GET) public void sendMeessageTotopic1(@PathVariable String topic,@RequestParam(value = "partition",defaultValue = "0") int partition) { logger.info("start send message to {}",topic); kafkaTemplate.send(topic,partition,"你","好"); } }
第七步、启动程序、调用接口
消息监听器只监听订阅的topic的特定分区的消息源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2
相关文章推荐
- Spring Boot整合Kafka的简单用例
- SpringBoot整合Kafka:简单收发消息案例
- springboot整合kafka(window单机版kafka简单向)
- spring boot 整合 freemark(简单结构)
- SpringBoot整合Quartz定时任务 的简单实例
- spring-boot搭建简单web(整合freemarker)(二)
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- SpringBoot(四)-- 整合Servlet、Filter、Listener
- spring boot + kafka 简单Demo
- Spring Boot整合Dubbo开发系列(一)----一个简单的示例
- SpringBoot 整合 Redis 的简单案例
- 全注解方式整合spring+mybatis模拟springboot整合
- Spring Boot系列之六 以注解方式整合MyBatis
- 详解SpringBoot 快速整合Mybatis(去XML化+注解进阶)
- springboot整合kafka
- Swagger(一) SpringBoot整合Swagger2简单的例子
- springboot之整合mybatis-annotation(注解方式)
- springboot与ebean的简单整合
- Swagger(一) SpringBoot整合Swagger2简单的例子
- Spring Boot的listener简单使用