Spring Boot整合Kafka的简单用例
2018-03-01 17:02
671 查看
一、kafka基本术语
Producer生产者,是发送消息的对象
Consumer
消费者,是订阅消息和处理消息的对象
Topic
主题,用于消息的分类,也就是一个标签,可以看作是一个频道,可以被多个消费者订阅
Broker
代理,kafka集群中的每一个服务器就是一个代理(Broker),消费者可以订阅一个或者多个主题(Topic),消费已经发布的主题
二、教程
首先需要我们启动zookeeper server,在启动kafka server,kafka server依赖于zookeeper server,确保有可用的Broker,否则会出现错误的提示:Broker may not be available1、引入Spring Boot依赖和Spring kafka依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.0.2.RELEASE</version> <scope>test</scope> </dependency>
2、kafka的基本配置
@Configuration @EnableKafka public class KafkaConfig { /* --------------producer configuration-----------------**/ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /* --------------consumer configuration-----------------**/ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } //实际执行消息消费的类,用于处理消息,做一些业务逻辑 @Bean public MyMessageListener myMessageListener(){ return new MyMessageListener(); } //消费者容器配置信息 @Bean public ContainerProperties containerProperties(){ Pattern topicPattern = Pattern.compile(".*[tT]opic.*"); //匹配满足正则的topic ContainerProperties containerProperties = new ContainerProperties(topicPattern);//订阅满足正则表达式的topic containerProperties.setMessageListener(myMessageListener());//订阅的topic的消息用myMessageListener去处理 return containerProperties; } @Bean public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(){ return new KafkaMessageListenerContainer<>(consumerFactory(),containerProperties()); } /* --------------kafka template configuration-----------------**/ @Bean public KafkaTemplate<String,String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic("defaultTopic"); return kafkaTemplate; } }
3、订阅消息监听器
//监听器必须实现MessageListener这个接口中onMessage方法 public class MyMessageListener implements MessageListener<String, String> { public final static Logger logger = LoggerFactory.getLogger(MyMessageListener.class); @Override//此方法处理消息 public void onMessage(ConsumerRecord<String, String> data) { String topic = data.topic();//消费的topic logger.info("-------------recieve message from {} topic-------------", topic); logger.info("partition:{}", String.valueOf(data.partition()));//消费的topic的分区 logger.info("offset:{}", String.valueOf(data.offset()));//消费者的位置 logger.info("get message from {} topic : {}", topic, data.value());//接收到的消息 } }
4、调用接口发送消息
@Autowired private KafkaTemplate<String,String> kafkaTemplate;//kafkaTemplate相当于生产者 @RequestMapping(value = "/{topic}/send",method = RequestMethod.GET) public void sendMeessage( @RequestParam(value = "message",defaultValue = "hello world") String message, @PathVariable final String topic) { logger.info("start sned message to {}",topic); ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic,message);//发送消息,topic不存在将自动创建新的topic listenableFuture.addCallback(//添加成功发送消息的回调和失败的回调 result -> logger.info("send message to {} success",topic), ex -> logger.info("send message to {} failure,error message:{}",topic,ex.getMessage())); } @RequestMapping(value = "/default/send",method = RequestMethod.GET) public void sendMeessagedefault() {//发送消息到默认的topic logger.info("start send message to default topic"); kafkaTemplate.sendDefault("你好,世界"); }
三、运行
启动SpringBoot应用程序,调用接口,就可以看到运行结果四、说明
1、KafkaTemplate.send()ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
topic:主题名称;partition:要发送消息到哪个分区;timestamp:创建消息的时间;key:消息的键;value:消息的值。
send方法是异步,一旦将消息保存在等待发送消息的缓存中就立即返回,这样就不会阻塞去等待每一条消息的响应。可以使用listenableFuture.cancle()方法去取消消息的发送,更多说明参考官网文档。
2、@KafkaListener注解的使用
此注解就代替了我们自己写的MyMessageListener,极为方便,由于依赖的冲突,存在一个bug
会导致一个奇怪的异常,java.lang.NoSuchMethodError: org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.(Ljava/lang/reflect/Method;)V;Spring 官方说明地址:https://github.com/spring-cloud/spring-cloud-release/issues/70,
解决办法:
使用注解在下一篇中。
相关文章推荐
- Spring Boot整合Kafka的简单用例(@KafkaListener注解)
- springboot整合kafka(window单机版kafka简单向)
- SpringBoot整合Kafka:简单收发消息案例
- SpringBoot整合SpringKafka实现生产者史上最简代码实现
- Spring Boot 构建应用——整合消息中间件 Kafka
- Swagger(一) SpringBoot整合Swagger2简单的例子
- SpringBoot整合Quartz定时任务 的简单实例 2
- spring boot mybatis 整合shiro简单实现登陆权限管理
- 被版本更新坑到哭系列:SpringBoot整合Kafka
- 一个简单的SpringBoot整合Mybatis项目
- springboot整合kafka应用
- spring-boot搭建简单web(整合freemarker)(二)
- Spring Boot整合Dubbo开发系列(一)----一个简单的示例
- Spring Boot整合集成StringRedisTemplate的简单记录
- SpringBoot 整合 Redis 的简单案例
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- SpringBoot+Maven项目实战(6):整合Log4j和Aop,实现简单的日志记录
- SpringBoot 整合 Redis 的简单案例
- Swagger(一) SpringBoot整合Swagger2简单的例子
- Java爬虫初体验:简单抓取IT之家热评(整合Spring Boot+Elasticsearch+Redis+Mybatis)