您的位置:首页 > 编程语言 > Java开发

SpringCloud Stream基于消息驱动

2019-04-28 09:30 489 查看

不知道你们在学习的过程中有没有这个疑问,消息主线和消息驱动的区别(springcloud bus 和 springcloud stream的区别)。我就去查了相关的资料总结了一下。

SpringCloud bus

Spring Cloud Bus将分布式系统的 **~~节点~~ ** 与轻量级消息代理链接。这可以用于广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,Bus就像一个扩展的Spring Boot应用程序的分布式执行器,但也可以用作应用程序之间的通信渠道。当前唯一的实现是使用AMQP代理作为传输,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上。 SpringCloud Bus是将节点与消息代理连接,比如我们要使用/Refresh端点进行配置更新那我们就可以使用Bus来一个类似广播的机制告知所有服务的/Refresh端点进行配置更新。通过端点然后借助消息代理实现消息广播。 在来看看什么是SpringCloud Stream

SpringCloud Stream

是一个用来为微服务应用构建消息驱动能力的框架。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。他通过内部抽象化的Binder与消息代理来进行通信。应用程序通过接入Input和output来与Stream的binder进行通信,Binder相当于应用程序和消息代理的缓冲层。 给我的理解就是应用程序通过Stream来与消息队列通信,然后发送或者接受信息。并且Stream的存在使得应用程序和消息队列实现了解耦,我们以后要更换不同的消息队列的话,只需要修改相关配置就可以轻松移植。 看一下网上比较好的架构图

目前Stream支持kafka、rabbitmq。本章针对rabbitmq的集成与实现 生产者和消费者在一个项目中配置 引入pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
定义两个接口,发送消息和接收消息
/**
* 消费消息接口
*/

public interface IMessageConsumer {

public void Consu();

}
/**
* 消息产生接口
*/

public interface IMessageProvider {

public void send(String str) ;

}
实现类
@Component
@EnableBinding(Source.class)
public class MsgSendImp implements IMessageProvider {

@Autowired
Source source;

@Override
public void send(String str) {
this.source.output().send(MessageBuilder.withPayload(str).build()); // 创建并发送消息
}

}
@Component
@EnableBinding(Sink.class)    //定义一个channel并和配置中的Binder连接
public class MsgConsImp implements IMessageConsumer {

@Override
@StreamListener(Sink.INPUT)
public void Consu() {
System.out.println("成功消耗信息");
}
}
注意@EnableBlinding注解 传入的参数分别是Source.class(生产者)和Sink.class(消费者)点进去源码我们知道一个是输出通道,一个是输入通道,这两个都是这两个类定义了两个channel,分别是input和output。这个注解的意思就是创建对应的通道,然后结合配置文件中的bindings绑定到queue。而我们后续操作就是拿着我们注册的通道(已经连上rabbitmq)来进行发送消息和消费消息。 @StreamListener是我们监听该通道所连接的Binder绑定的Queue 看我们的yml配置
spring:
application:
name: Producer_Consumer
cloud:
stream:
binders:  #定义一个binder类型
defaultRabbit:
type: rabbit   #使用的消息代理类型
environment:   #消息代理信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings:    #绑定规则    一下配置是针对rabbitmq  比如 Exchange  和   队列
output:    #通道名称  默认是@input 或者  @output 注解参数的值
destination: studyExchange   #标识要使用的Exchange名称
content-type: application/json  # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit  #要绑定的消息服务的具体设置
input:    #通道名称  默认是@input 或者  @output 注解参数的值
destination: studyExchange   #标识要使用的Exchange名称
content-type: application/json  # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit  #要绑定的消息服务的具体设置
我定义一个端点来发送消息
@RestController
public class Hello {

@Autowired
MsgSendImp msgSendImp;

@RequestMapping("hello")
public String hello(){
msgSendImp.send("asd");
return "发送成功了哦!";
}

}
启动我们的程序 访问我们 /hello 端点

因为我们定义的消费者一直在监听通道,控制台打印消费了消息、

在实际应用中,我们可以根据实际业务需要,来对微服务通过消息发送订阅模式进行一些相关操作。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: