您的位置:首页 > 编程语言 > Java开发

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

2019-07-08 09:29 603 查看
原文链接:http://www.cnblogs.com/liululee/p/11149302.html

1. 概述

在本文中,我们将向您介绍

Spring Cloud Stream
,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如
RabbitMQ
Apache Kafka
等)连接。

Spring Cloud Stream
构建在现有Spring框架(如
Spring Messaging
Spring Integration
)之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的
message broker
紧密耦合。此外,有时对某些用例进行扩展是困难的。

Spring Cloud Stream
背后的想法是一个非常典型的
Spring Boot
概念——
抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
。这意味着您可以通过更改依赖项和配置文件来更改
message broker
。可以在这里找到目前已经支持的各种消息代理。

本文将使用

RabbitMQ
作为
message broker
。在此之前,让我们了解一下
broker
(代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。

2. 微服务中的消息

在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个

Service-A
内部调用
Service-B
Service-C
来完成一个请求:
[外链图片转存失败(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]

是的,还会有其他组件,比如

Spring Cloud Eureka
Spring Cloud Zuul
等等,但我们还是专注关心这类架构的特有问题。

假设由于某种原因

Service-B
需要更多的时间来响应。也许它正在执行
I/O操作
或长时间的
DB事务
,或者进一步调用其它导致
Service-B
变得更慢的服务,这些都使其无法更具效率。

现在,我们可以启动更多的

Service-B
实例来解决这个问题,这样很好,但是
Service-A
实际上是响应很快的,它需要等待
Service-B
的响应来进一步处理。这将导致
Service-A
无法接收更多的请求,这意味着我们还必须启动
Service-A
的多个实例。

另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着

Service-A
不直接通过
HTTP
调用
Service-B
Service-C
,而是将请求或事件发布给
message broker
(消息代理)。
Service-B
Service-C
将成为
message broker
(消息代理)上此事件的订阅者。

与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:

  • 提高可伸缩性和可靠性——现在我们知道哪些服务是整个应用程序中的真正瓶颈。
  • 鼓励松散耦合——
    Service-A
    不需要了解
    Service-B
    Service-C
    。它只需要连接到
    message broker
    并发布事件。事件如何进一步编排取决于代理设置。通过这种方式,
    Service-A
    可以独立地运行,这是微服务的核心概念之一。
  • 与遗留系统交互——通常我们不能将所有东西都移动到一个新的技术堆栈中。我们仍然必须使用遗留系统,虽然速度很慢,但是很可靠。

3. RabbitMQ

高级消息队列协议(AMQP)
RabbitMQ
用于消息传递的协议。虽然
RabbitMQ
支持其他一些协议,但是
AMQP
由于兼容性和它提供的大量特性而更受欢迎。

3.1 RabbitMQ架构设计

因此发布者将消息发布到

RabbitMQ
中称为
Exchange
(交换器)。
Exchange
(交换器)接收消息并将其路由到一个或多个
Queues
(队列)。路由算法依赖于
Exchange
(交换器)类型和
routing
(路由)key/header(与消息一起传递)。将
Exchange
(交换器)连接到
Queues
(队列)的这些规则称为
bindings
(绑定)。

绑定可以有4种类型:

  • Direct: 它根据
    routing key
    (路由键)将
    Exchange
    (交换器)类型直接路由到特定的
    Queues
    (队列)。
  • Fanout:它将消息路由到绑定
    Exchange
    (交换器)中的所有
    Queues
    (队列)。
  • Topic:它根据完全匹配或部分据
    routing key
    (路由键)匹配将消息路由到(0、1或更多)的
    Queues
    (队列)。
  • Headers:它类似于
    Topic
    (主题)交换类型,但是它是基
    routing header
    (路由头)而不是
    routing key
    (路由键)来路由的。
  • 来源: https://www.cloudamqp.com/

    通过

    Exchange
    (交换器)和
    Queues
    (队列)发布和消费消息的整个过程是通过一个
    Channel
    (通道)完成的。

    有关路由的详细信息,请访问此链接

    3.2 RabbitMQ 设置

    3.2.1 安装

    我们可以从这里下载并安装基于我们的操作系统的二进制文件。

    然而,在本文中,我们将使用

    cloudamqp.com
    提供的免费云安装。只需注册服务并登录即可。

    在主仪表板上单击

    创建新实例
    :

    然后给你的实例起个名字,然后进入下一步:

    然后选择一个可用区:

    最后,查看实例信息,点击右下角的

    创建实例
    :

    就是这样。现在在云中运行了一个

    RabbitMQ
    实例。有关实例的更多信息,请转到您的仪表板并单击
    新创建的实例
    :

    我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:

    我们将在Spring应用程序中使用

    AMQP URL
    连接到这个实例,所以请在某个地方记下它。

    您还可以通过单击左上角的

    RabbitMQ manager
    来查看管理器控制台。这将采用它来管理的您的
    RabbitMQ
    实例。

    Project 配置

    现在我们的设置已经准备好了,让我们创建我们的服务:

  • cloud-stream-producer-rabbitmq: 作为一个发布者,将消息推送到
    RabbitMQ
  • cloud-stream-consumer-rabbitmq: 消费者消费消息

使用

Spring Initializr
创建一个脚手架项目。这将是我们的
producer
项目,我们将使用
REST
端点发布消息。

选择您喜欢的

Spring Boot
版本,添加
Web
Cloud Stream
依赖项,生成
Maven
项目:

注意:

请注意

cloud-stream
依赖项。这也需要像
RabbitMQ
Kafka
等绑定器依赖项才能工作。

由于我们将使用

RabbitMQ
,添加以下
Maven
依赖项:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,我们也可以将两者结合起来使用

spring-cloud-starter-stream-rabbit
:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

使用同样的方法,创建消费者项目,但仅使用

spring-cloud-starter-stream-rabbit
依赖项。

4. 创建生产者

如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个

HelloBinding
接口,其中包含我们的消息机制
greetingChannel
:

interface HelloBinding {

@Output("greetingChannel")
MessageChannel greeting();
}

因为这将发布消息,所以我们使用

@Output
注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个
Channel
(通道)。

现在,让我们创建一个

REST
,它将消息推送到这个
Channel
(通道)

@RestController
public class ProducerController {

private MessageChannel greet;

public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}

@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}

上面,我们创建了一个

ProducerController
类,它有一个
MessageChannel
类型的属性 
greet
。这是通过我们前面声明的方法在构造函数中初始化的。

注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。

然后,我们有一个简单的

REST
接口,它接收
PathVariable
name
,并使用
MessageBuilder
创建一个
String
类型的消息。最后,我们使用
MessageChannel
上的
.send()
方法来发布消息。

现在,我们将在的主类中添加

@EnableBinding
注解,传入
HelloBinding
告诉
Spring
加载。

@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

最后,我们必须告诉

Spring
如何连接到
RabbitMQ
(通过前面的
AMQP URL
),并将
greetingChannel
连接到一可用的消费者。

这两个都是在

application.properties
中定义的:

spring.rabbitmq.addresses=<amqp url>

spring.cloud.stream.bindings.greetingChannel.destination = greetings

server.port=8080

5. 创建消费者

现在,我们需要监听之前创建的通道

greetingChannel
。让我们为它创建一个绑定:

public interface HelloBinding {

String GREETING = "greetingChannel";

@Input(GREETING)
SubscribableChannel greeting();
}

与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用

SubscribableChannel
@Input
注解连接到
greetingChannel
,消息数据将被推送这里。

现在,让我们创建处理数据的方法:

@EnableBinding(HelloBinding.class)
public class HelloListener {

@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}

在这里,我们创建了一个

HelloListener
类,在
processHelloChannelGreeting
方法上添加
@StreamListener
注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加
@EnableBinding
启用了
HelloBinding

同样,我们在这里使用

@EnableBinding
,而不是主类,以便告诉我们如何使用。

看看我们的主类,我们没有任何修改:

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

application.properties
配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外

spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090

6. 全部测试

让我们同时启动生产者和消费者服务。首先,让我们通过点击端点

http://localhost:8080/greet/john
来生产消息。

在消费者日志中看到消息内容:

我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):

$ mvn spring-boot:run -Dserver.port=9091

现在,当我们点击生产者

REST
端点生产消息时,我们看到两个消费者都收到了消息:

这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在

application.properties
中创建一个消费者组。消费者的配置文件:

spring.cloud.stream.bindings.greetingChannel.group = greetings-group

现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:

这一切也可以在

RabbitMQ
管理器控制台看到:

7. 结论

在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用

Spring Cloud Stream
实现它。我们使用
RabbitMQ
作为消息代理,但是我们也可以使用其他流行的代理,比
如Kafka
,只需更改配置和依赖项。

与往常一样,本文使用的示例代码可以在GitHub获得完整的源代码

原文:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/

作者:Dhananjay Singh

译者:李东

转载于:https://www.cnblogs.com/liululee/p/11149302.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: