SpringCloud Stream基于消息驱动
2019-04-28 09:30
489 查看
不知道你们在学习的过程中有没有这个疑问,消息主线和消息驱动的区别(springcloud bus 和 springcloud stream的区别)。我就去查了相关的资料总结了一下。
SpringCloud bus
Spring Cloud Bus将分布式系统的 **~~节点~~ ** 与轻量级消息代理链接。这可以用于广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,Bus就像一个扩展的Spring Boot应用程序的分布式执行器,但也可以用作应用程序之间的通信渠道。当前唯一的实现是使用AMQP代理作为传输,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上。 SpringCloud Bus是将节点与消息代理连接,比如我们要使用/Refresh端点进行配置更新那我们就可以使用Bus来一个类似广播的机制告知所有服务的/Refresh端点进行配置更新。通过端点然后借助消息代理实现消息广播。 在来看看什么是SpringCloud StreamSpringCloud Stream
是一个用来为微服务应用构建消息驱动能力的框架。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。他通过内部抽象化的Binder与消息代理来进行通信。应用程序通过接入Input和output来与Stream的binder进行通信,Binder相当于应用程序和消息代理的缓冲层。 给我的理解就是应用程序通过Stream来与消息队列通信,然后发送或者接受信息。并且Stream的存在使得应用程序和消息队列实现了解耦,我们以后要更换不同的消息队列的话,只需要修改相关配置就可以轻松移植。 看一下网上比较好的架构图目前Stream支持kafka、rabbitmq。本章针对rabbitmq的集成与实现 生产者和消费者在一个项目中配置 引入pom依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>定义两个接口,发送消息和接收消息
/** * 消费消息接口 */ public interface IMessageConsumer { public void Consu(); }
/** * 消息产生接口 */ public interface IMessageProvider { public void send(String str) ; }实现类
@Component @EnableBinding(Source.class) public class MsgSendImp implements IMessageProvider { @Autowired Source source; @Override public void send(String str) { this.source.output().send(MessageBuilder.withPayload(str).build()); // 创建并发送消息 } }
@Component @EnableBinding(Sink.class) //定义一个channel并和配置中的Binder连接 public class MsgConsImp implements IMessageConsumer { @Override @StreamListener(Sink.INPUT) public void Consu() { System.out.println("成功消耗信息"); } }注意@EnableBlinding注解 传入的参数分别是Source.class(生产者)和Sink.class(消费者)点进去源码我们知道一个是输出通道,一个是输入通道,这两个都是这两个类定义了两个channel,分别是input和output。这个注解的意思就是创建对应的通道,然后结合配置文件中的bindings绑定到queue。而我们后续操作就是拿着我们注册的通道(已经连上rabbitmq)来进行发送消息和消费消息。 @StreamListener是我们监听该通道所连接的Binder绑定的Queue 看我们的yml配置
spring: application: name: Producer_Consumer cloud: stream: binders: #定义一个binder类型 defaultRabbit: type: rabbit #使用的消息代理类型 environment: #消息代理信息 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / bindings: #绑定规则 一下配置是针对rabbitmq 比如 Exchange 和 队列 output: #通道名称 默认是@input 或者 @output 注解参数的值 destination: studyExchange #标识要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit #要绑定的消息服务的具体设置 input: #通道名称 默认是@input 或者 @output 注解参数的值 destination: studyExchange #标识要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit #要绑定的消息服务的具体设置我定义一个端点来发送消息
@RestController public class Hello { @Autowired MsgSendImp msgSendImp; @RequestMapping("hello") public String hello(){ msgSendImp.send("asd"); return "发送成功了哦!"; } }启动我们的程序 访问我们 /hello 端点
因为我们定义的消费者一直在监听通道,控制台打印消费了消息、
在实际应用中,我们可以根据实际业务需要,来对微服务通过消息发送订阅模式进行一些相关操作。
相关文章推荐
- Spring Cloud Stream:基于事件(消息)驱动的微服务框架
- SpringCloud微服务实战之消息驱动Stream
- SpringCloud——Stream(消息驱动)
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- SpringCloudStream 构建消息驱动的微服务框架
- SpringCloud Stream-----1、消息驱动的微服务概念
- Spring Cloud Stream(消息驱动)介绍
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud Stream(消息驱动)
- Spring Cloud 应用篇 之 Spring Cloud Stream(消息驱动)
- SpringCloud微服务知识整理十:消息驱动的微服务:Spring Cloud Stream
- 消息驱动:Spring Cloud Stream
- SpringCloud学习笔记(10)——消息驱动的微服务:Spring Cloud Stream
- 【Spring Cloud】Stream消息驱动
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud 应用篇 之 Spring Cloud Stream(消息驱动)
- Spring Cloud架构教程 (七)消息驱动的微服务(核心概念)【Dalston版】
- Spring Cloud架构教程 (八)消息驱动的微服务(消费组)【Dalston版】
- Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
- spring_cloud_stream消息分区 消息反馈