您的位置:首页 > 编程语言 > Java开发

SpringCloud Stream消息驱动

2020-04-22 16:24 465 查看

消息驱动概述

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

    官方定义Spring Cloud Stream是一个构建消息驱动的微服务框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中的binder交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream监护就可以方便使用消息驱动的方式。

    通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区三个核心概念。
目前仅支持RabbitMQ和Kafka

设计思想

标准MQ

  • 生产者/消费者之间靠消息媒介传递消息内容 Message
  • 消息必须走特定的通道
      消息通道MessageChannel
  • 消息通道里的消息如何被消费呢,谁负责处理
      消息通道MessageChannel的子接口SubscribableChannel,有MessageHandler消息处理器所订阅。

    Cloud Stream

        通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    • Binder
        Input对应于消费者
      1. Output对应于生产者

    Stream中的消息通信方式遵循了发布-订阅模式

        使用Topic主题进行广播

    • 在RabbitMQ就是Exchanage
    • 在Kafka中就是Topic

    Stream的组件

    • Binder 很方便的连接中间件,屏蔽差异
    • Channel 通道,是队列的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
    • Source和Sink 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,就收消息就是输入

    编码API和常用注解

    Middleware

    中间件,目前只支出RabbitMQ和Kafka

    Binder

    Binder是应用与中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对于Kafka的Topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。

    @Input

    注解标识输入通道,通过该输入通道接收到的消息进入应用程序

    @Output

    注解标识输出通道,发布的消息将通过该通道离开应用程序

    @StreamListener

    监听队列,用于消费者的队列的信息接收

    @EnableBinding

    指信道channel和exchange绑定在一起

    案例

    新建三个模块

    cloud-stream-rabbitmq-provider8801
    ,作为生产者进行发消息模块

    cloud-stream-rabbitmq-consumer8802
    ,作为消息接收者模块

    cloud-stream-rabbitmq-consumer8803
    ,作为消息接收者模块

    消息驱动之生产者

    cloud-stream-rabbitmq-provider8801

    pom.xml

    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>

    application.yml

    server:
    port: 8801
    
    spring:
    application:
    name: cloud-stream-provider
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    eureka:
    client: # 客户端进行Eureka注册的配置
    service-url:
    defaultZone: http://localhost:7001/eureka
    instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

    启动类

    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    @EnableEurekaClient
    public class StreamMain8801 {
    public static void main(String args[]){
    SpringApplication.run(StreamMain8801.class,args);
    }
    }

    业务类

    serviceImpl

    @EnableBinding(Source.class)  //定义消息的推送管道
    @Slf4j
    public class MessageProviderImpl implements IMessageProvider {
    
    @Resource
    private MessageChannel output;//消息发送管道
    @Override
    public String send() {
    String serial = UUID.randomUUID().toString();
    output.send(MessageBuilder.withPayload(serial).build());
    log.info("******serial:"+serial);
    return null;
    }
    }

    controller

    @RestController
    @Slf4j
    public class SendMessageController {
    @Resource
    private IMessageProvider iMessageProvider;
    
    @GetMapping("/sendMessage")
    public String sendMessage(){
    return iMessageProvider.send();
    }
    }

    消息驱动之消费者

    pom.xml

    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>

    application.yml

    server:
    port: 8802
    
    spring:
    application:
    name: cloud-stream-consumer
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    input: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    eureka:
    client: # 客户端进行Eureka注册的配置
    service-url:
    defaultZone: http://localhost:7001/eureka
    instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

    业务类

    @Component
    @Slf4j
    @EnableBinding(Sink.class)
    public class ReceiveMessageController {
    @Value("${server.port}")
    private String serverPort;
    
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
    log.info("消费者1号:----->接收到的消息:"+message.getPayload()+"\t"+"  serverPort:"+serverPort);
    }
    }

    测试

    服务启动后,先访问http://localhost:8801/sendMessage向消息中间件发送消息,查看控制台日志输出,然后再去8802的控制台查看,是否有接收消息。

    分组消费与持久化

    解决重复消费的问题

    例如:订单系统我们做集群部署,都会从RabbitMQ中获取订单消息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。

    使用Stream的消息分组就可避免

    注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中的一个应用消费一次。

    不同组是可以全面消费的(重复消费)

    同一组内会发生竞争关系,只有其中一个可以消费

    分组

    原理

        微服务应用放置于同一个group中,就能够保证消息只会被其中的一个应用消费一次。不同组是可以消费的,听一个组内会发生竞争关系,只有其中一个可以消费

    实现

    在application.yml添加

    spring:
    cloud:
    stream:
    binding:
    input:
    group: groupName

    解决重复消费

    • 将所有的消费者分到同一组

    持久化

    避免消息丢失

    配置group后,默认支持消息持久化

    • 点赞
    • 收藏
    • 分享
    • 文章举报
    Junqiang Li 发布了17 篇原创文章 · 获赞 0 · 访问量 560 私信 关注
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: