SpringCloud-Stream 消息驱动
2020-04-27 18:11
856 查看
# 一、概述
## 是什么?
Spring Cloud Stream 是一个构建消息微服务驱动的框架。可以屏蔽底层消息中间件的差异,降低版本切换成本,统一消息的编程模型,目前仅支持 RabbitMQ 和 Kafka。
## 设计思想
### 标准 MQ 的设计思想
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427105050.png)
生产者 / 消费者之间靠消息媒介传递信息内容,Message
消息必须走特定的通道,MessageChannel
消息通道里的消息如何被消费呢,谁负责收发处理?消息通道MessageChannel的子接口SubscribableChannel,由消息处理器MessageHandler所订阅
### Spring Cloud Stream 的设计思想
如果我们的项目中用到了 RabbitMQ 和 Kafka 两种消息中间件,由于它们的架构不同,对实际开发造成了一定困扰;或者用到了一种消息中间件,随着后面的业务需求需要向另一种消息队列迁移,这无疑是灾难性的,会造成一大堆的改动,因为它们与系统耦合了,这时候 Spring Cloud Stream 就可以为我们提供一种解耦的方式。
Spring Cloud Stream 提供的解决方案是:**通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。**向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种消息中间件的实现。
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427111957.png)
inputs 对应消费者,outputs 对应生产者
**Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)**
## 工作流程
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427112924.png)
Binder:绑定器,很方便的连接中间件,屏蔽差异
Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储与转发的媒介,通过 Channel 对队列进行配置
Source 和 Sink:简单理解就是参照物是 Spring Cloud Stream 本身,从 Stream 发布消息就是输出,接收消息就是输入
## 编码 API 和常用注解
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427113659.png)
# 二、基本使用
## 生产者
配置:
```yml
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 配置绑定的rabbitmq的服务信息
defaultRabbit: # 表示定义的名称,用于与binding整合
type: rabbit # 消息组件的类型
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名字
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,如果文本就是“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
```
发送消息:
```java
@EnableBinding(Source.class)
@Slf4j
public class MessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public void send() {
String serial = IdUtil.simpleUUID();
output.send(MessageBuilder.withPayload(serial).build());
log.info("流水号:" + serial);
}
}
```
## 消费者
配置与生产者一致,只需要把 output 改为 input
接收消息:
```java
@Controller
@EnableBinding(Sink.class)
@Slf4j
public class MessageReceiveController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void receiveMessage(Message message){
log.info("receive -> " + serverPort + " -> " +message.getPayload());
}
}
```
# 三、解决消息重复消费的问题
## 场景
举个栗子,我们对订单系统做了集群部署,消费者从 RabbitMQ 中获取订单信息,如果同一个订单被不同的服务都获取到了,就会造成数据错误,为了避免这种情况,我们可以使用 Stream 中的消息分组来解决。
## 原理
在 Stream 中,处于同一个组的多个消费者是竞争关系,就可以保证消息只被一个服务消费一次,而不同组是可以重复消费的。现在默认分组就是不同的,组流水号不一样。
## 解决
将不想产生重复消费的服务分为同一个组即可
## 配置方式
```yml
spring:
cloud:
stream:
bindings:
input:
group: groupA
```
# 四、持久化
如果我们的消费者因为种种原因宕机了,生产者此时发送了消息,没有配置 group 属性的消费者重新上线后无法接收到之前的消息,而配置了 group 的消费者仍会接收到消息,这就是持久化的效果
相关文章推荐
- 消息驱动式微服务:Spring Cloud Stream & RabbitMQ
- Spring Cloud Stream消息驱动之RocketMQ入门(一)
- 消息驱动式微服务:Spring Cloud Stream & RabbitMQ
- SpringCloud微服务知识整理十:消息驱动的微服务:Spring Cloud Stream
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- SpringCloud学习笔记(10)——消息驱动的微服务:Spring Cloud Stream
- SpringCloud——Stream(消息驱动)
- SpringCloud 进阶: 消息驱动(入门)Spring Cloud Stream【Greenwich.SR3】
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud 应用篇 之 Spring Cloud Stream(消息驱动)
- 使用 Spring Cloud Stream 构建消息驱动微服务
- SpringCloud Stream消息驱动
- SpringCloud Stream基于消息驱动
- SpringCloud Stream-----1、消息驱动的微服务概念
- Spring Cloud 应用篇 之 Spring Cloud Stream(消息驱动)
- SpringCloudStream 构建消息驱动的微服务框架
- 【Spring Cloud】Stream消息驱动
- Spring Cloud Stream(消息驱动)
- Spring Cloud Stream - 构建消息事件驱动的微服务
- Spring Cloud Stream:基于事件(消息)驱动的微服务框架