spring_cloud_stream消息分区 消息反馈
消费组
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被消费者实例接收和处理.当我们只想被其中一个消费者消费,我们可以使用消费组,我们只需在消费者端设置 spring.cloud.stream.bindings.input.group 属性即可
spring.cloud.stream.bindings.shendu_input.group=greetings
消息分区
对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一实例进行消费,这时候我们就需要对消息进行分区处理
版本spring boot 1.5.14
consumer配置
[code]spring.application.name=spring_cloud_stream_partition_consumer2 server.port=8083 spring.rabbitmq.host=192.168.86.132 spring.rabbitmq.port=5672 spring.rabbitmq.username=springcloud spring.rabbitmq.password=springcloud spring.cloud.stream.bindings.shenduinput.group=service-A spring.cloud.stream.bindings.shenduinput.destination=greetings # 通过该参数开启消费者分区功能 spring.cloud.stream.bindings.shenduinput.consumer.partitioned=true #该参数指定了当前消费者的总实例数量 spring.cloud.stream.instanceCount=2 #该参数设置当前实例的索引号,从0开始 spring.cloud.stream.instance-index=1
生产者配置
public interface Message<T> { /** * Return the message payload. */ T getPayload(); MessageHeaders getHeaders(); }
spring.cloud.stream.bindings.shenduoutput.producer.partitionKeyExpression 通过该参数指定了分区键的表达式规则,分区key的值是基于partitionKeyExpression计算得出的,用于每个消息被发送至对应分区的输出channel 该表达式作用于传递给MessageChannel的send方法的参数,该参数是实现 org.springframework.messaging.Message接口的类, GenericMessage类是Spring为我们提供的一个实现Message接口的类,我们封装的信息将会放在payload属性上, 如果partitionKeyExpression的值是payload,将会使用整个我们放在GenericMessage中的信息做分区数据
payload 是消息的 实体类型,可以为自定义类型,比如 User,Role等等 |
[code]spring.cloud.stream.bindings.shenduoutput.destination=greetings spring.cloud.stream.bindings.shenduoutput.producer.partitionKeyExpression=payload.id #该参数指定了消息分区的数量 spring.cloud.stream.bindings.shenduoutput.producer.partitionCount=2 spring.rabbitmq.host=192.168.86.132 spring.rabbitmq.port=5672 spring.rabbitmq.username=springcloud spring.rabbitmq.password=springcloud server.port=8081 spring.application.name=spring_cloud_stream_partition_sender
消息反馈
很多时候在处理完输入消息之后,需要反馈一个消息给对方,这时候可以通过@sendto注解来指定返回内容的输出通道
简单演示一下 消息反馈
版本 spring boot 1.5.14
定义消费接口
[code]package springcloud.spring_cloud_stream_feedback; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.SubscribableChannel; public interface SinkReceiver { @Input("shendu_input") SubscribableChannel input(); }
定义生产接口
[code]package springcloud.spring_cloud_stream_feedback; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface SinkSender { @Output("shendu_output") MessageChannel output(); }
配置文件
[code]spring.rabbitmq.host=192.168.86.132 spring.rabbitmq.port=5672 spring.rabbitmq.username=springcloud spring.rabbitmq.password=springcloud # 主题 spring.cloud.stream.bindings.shendu_input.destination=shendu_output # 定义消息的去向 spring.cloud.stream.bindings.shendu_output.destination=shendu_input
应用一
[code]package springcloud.spring_cloud_stream_feedback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.handler.annotation.SendTo; @EnableBinding(value = {SinkReceiver.class}) public class App1 { private static Logger logger = LoggerFactory.getLogger(App1.class); @StreamListener("shendu_output") @SendTo({"shendu_input"}) public Object receiveFromInput(Object payload){ logger.info("received: "+payload); return "From Input Channel Return - "+payload; } }
应用二
[code]package springcloud.spring_cloud_stream_feedback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import java.util.Date; @EnableBinding(value = {SinkSender.class}) public class App2 { private static Logger logger = LoggerFactory.getLogger(App2.class); @Bean @InboundChannelAdapter(value = "shendu_output",poller = @Poller(fixedDelay = "2000")) public MessageSource<Date> timerMessageSource(){ return new MessageSource<Date>() { @Override public Message<Date> receive() { return new GenericMessage(new Date()); } }; } @StreamListener("shendu_input") public void receiveFromOutput(Object payload){ logger.info("Received: "+payload); } }
启动类
[code]package springcloud.spring_cloud_stream_feedback; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringCloudStreamFeedbackApplication { public static void main(String[] args) { SpringApplication.run(SpringCloudStreamFeedbackApplication.class, args); } }
pom 依赖
[code]<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
阅读更多
- Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收
- Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】
- SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介、创建消息生产者、创建消息消费者、自定义消息通道、分组与持久化、设置 RoutingKey)
- 消息驱动:Spring Cloud Stream
- SpringCloud Stream-----1、消息驱动的微服务概念
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud Stream(消息驱动)介绍
- Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】
- SpringCloud Stream-----2、消息生产与消费
- SpringCloud微服务实战之消息驱动Stream
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- SpringCloudStream 构建消息驱动的微服务框架
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud Stream(消息驱动)
- SpringCloud之消息总线(Spring Cloud Bus)(八)
- SpringCloud消息总线
- 第八篇: 消息总线(Spring Cloud Bus)
- 史上最简单的SpringCloud教程 | 第八篇: 消息总线(Spring Cloud Bus)
- Spring Cloud Stream初窥
- SpringCloud Bus消息总线的实现