springboot整合kafka案例demo
2020-07-13 06:02
99 查看
springboot整合kafka案例demo
在上一篇博客详细介绍了kafka的安装,使用 kafka + zookeeper下载/安装/使用(超详细)
这里介绍一下将kafka整合进springboot中使用.前提还是得启动zk和kafka的服务!
话不多说了,直接上代码
demo下载地址: https://download.csdn.net/download/dayonglove2018/12546138
整体目录结构:
pom依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zjy</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.3.0.RELEASE</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- lombok ********************* Begin --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- lombok ********************* End --> <!-- swagger ********************* Begin --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- swagger ********************* End --> <!-- kafka ********************* Begin --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- kafka ********************* End --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.properties
server.port=2080 spring.kafka.bootstrap-servers=localhost:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=1000 spring.kafka.producer.buffer-memory=1000000 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=test-hello-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
WwaggerConfig
package com.zjy.kafka.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .pathMapping("/") .select() .apis(RequestHandlerSelectors.any()) .apis(RequestHandlerSelectors.basePackage("com.zjy.kafka.controller")) .build(); } private ApiInfo apiInfo() { return new ApiInfoBuilder() .title("springboot利用swagger构建api文档") .description("swagger接口文档") .version("1.0") .build(); } }
ProductController
package com.zjy.kafka.controller; import io.swagger.annotations.Api; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @Api(value = "kafka生产者", tags = {"kafka生产者"}) @Slf4j @Validated @RestController @RequestMapping("/kafka") public class ProductController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; /** * 生产者发送消息 * @return */ @GetMapping("/send") public String send(@RequestParam(value = "topic") String topic, @RequestParam(value = "msg") String msg){ kafkaTemplate.send(topic, msg); log.info("发送消息的topic为: {}! 发送的内容为: {}", topic, msg); return "发送消息成功!"; } }
CustomerController
package com.zjy.kafka.controller; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * 消费者消费消息 */ @Slf4j @Component public class CustomerController { /** * topics = "demo" 要消费的topic名称 * @param record */ @KafkaListener(topics = "demo") public void listen (ConsumerRecord<?, ?> record){ log.info("topic是: {}, offset是: {}, value是: {}", record.topic(), record.offset(), record.value()); } }
KafkaApplication
package com.zjy.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
以上就是所有的代码和配置
测试
启动项目访问swagger:http://localhost:2080/swagger-ui.html
请求参数就是2个.一个topic 一个msg
控制台打印:
黑窗口也可以打印出来:
测试结果是OK的
如果请求个没有的topic.也是可以进请求的,只不过消费者是不会打印出结果的(消费消息)
欢迎大佬们留言评论,共同学习!!!感谢!!!
===========================
原创文章,转载注明出处!
相关文章推荐
- SpringBoot整合Kafka:简单收发消息案例
- 记一次springboot和kafka的整合(kafka demo)
- springboot整合redis的demo
- 整合SpringBoot+Mysql+Redis实现缓存机制的一个Demo
- SpringBoot整合Kafka和Storm
- SpringBoot(十一):SpringBoot整合Kafka
- Springboot 整合 Mybatis 的完整 Web 案例
- Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例
- Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例
- springboot与redis整合案例(上)
- Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例
- SpringBoot开发案例之整合mail发送服务
- SpringBoot整合Kafka报错:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching
- Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例
- SpringBoot开发案例之整合mongoDB
- Docker 部署 SpringBoot 项目整合 Redis 镜像做访问计数Demo
- spring boot学习系列:spring boot与jdbcTemplate的整合案例
- SpringBoot开发案例之整合Swagger篇
- Springboot整合kafka
- Spring Boot 整合 Thymeleaf 完整 Web 案例