您的位置:首页 > 编程语言 > Java开发

Spring Cloud 系列之 Stream 消息驱动(二)

2020-05-01 17:29 429 查看
本篇文章为系列文章,未读第一集的同学请猛戳这里:Spring Cloud 系列之 Stream 消息驱动(一) 本篇文章讲解 Stream 如何实现消息分组和消息分区。    ## 消息分组      点击链接观看:Stream 消息分组视频(获取更多请关注公众号「哈喽沃德先生」)      如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统做集群部署,都会从 RabbitMQ 中获取订单信息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分组来解决该问题。 ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/20190628194832868.png)   在 Stream 中处于同一个 `group` 中的多个消费者是竞争关系,能够保证消息只会被其中一个应用消费。不同的组是可以消费的,同一个组会发生竞争关系,只有其中一个可以消费。通过 `spring.cloud.stream.bindings..group` 属性指定组名。 ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/SCSt-groups.png)    ### 问题演示      在 `stream-demo` 项目下创建 `stream-consumer02` 子项目。   项目代码使用入门案例中消息消费者的代码。   单元测试代码如下: ```java package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } } ```    ### 测试      运行单元测试发送消息,两个消息消费者控制台打印结果如下:   stream-consumer 的控制台: ```shell message = hello spring cloud stream ```   stream-consumer02 的控制台: ```shell message = hello spring cloud stream ```   通过结果可以看到消息被两个消费者同时消费了,原因是因为它们属于不同的分组,默认情况下分组名称是随机生成的,通过 RabbitMQ 也可以得知: ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/image-20200216000518664.png)    ### 配置分组      stream-consumer 的分组配置为:`group-A`。 ```yml server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A ```      stream-consumer02 的分组配置为:`group-A`。 ```yml server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A ```    ### 测试      运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费。RabbitMQ 结果如下: ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/image-20200216001742950.png)    ## 消息分区      点击链接观看:Stream 消息分区视频(获取更多请关注公众号「哈喽沃德先生」)      通过消息分组可以解决消息被重复消费的问题,但在某些场景下分组还不能满足我们的需求。比如,同时有多条同一个用户的数据发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑使用**消息分区**了。   当生产者将消息发送给多个消费者时,保证同一消息始终由同一个消费者实例接收和处理。消息分区是对消息分组的一种补充。 ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/SCSt-partitioning.png) ### 问题演示      先给大家演示一下消息未分区的效果,单元测试代码如下: ```java package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { for (int i = 1; i var1, long var2); } ```   源码 GenericMessage.java ```java package org.springframework.messaging.support; import java.io.Serializable; import java.util.Map; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; public class GenericMessage implements Message, Serializable { private static final long serialVersionUID = 4268801052358035098L; private final T payload; private final MessageHeaders headers; ... } ```      如果 `partition-key-expression` 的值是 `payload`,将会使用所有放在 `GenericMessage` 中的数据作为分区数据。`payload` 是消息的实体类型,可以为自定义类型比如 `User`,`Role` 等等。   如果 `partition-key-expression` 的值是 `headers["xxx"]`,将由 `MessageBuilder` 类的 `setHeader()` 方法完成赋值,比如: ```java package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 发送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build()); } } ```      消息消费者配置**消费者总数**和**当前消费者的索引**并**开启分区支持**。   stream-consumer 的 application.yml ```yml server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 0 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持 ```   stream-consumer02 的 application.yml ```yml server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 1 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持 ```    ### 测试      运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费所有消息。RabbitMQ 结果如下: ![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/stream/image-20200216110952309.png)   至此 Stream 消息驱动所有的知识点就讲解结束了。 ![](https://user-gold-cdn.xitu.io/2020/5/1/171cf87f564bc82e?w=433&h=133&f=gif&s=333013) 本文采用 `知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议`。 大家可以通过 `分类` 查看更多关于 `Spring Cloud` 的文章。    🤗 您的`点赞`和`转发`是对我最大的支持。 📢 扫码关注 `哈喽沃德先生`「文档 + 视频」每篇文章都配有专门视频讲解,学习更轻松噢 ~ ![](https://user-gold-cdn.xitu.io/2020/4/27/171b91f4ae03d737?w=500&h=500&f=gif&s=126591) ![](https://user-gold-cdn.xitu.io/2020/5/1/171cf883c5db1e33?w=900&h=383&f=png&s=70480)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: