深入浅出 RabbitMQ
2020-07-18 13:24
99 查看
什么是 RabbitMQ
简介(优点)
- 基于
ErLang
语言开发有高可用高并发的优点,适合集群。 - 开源、稳定、易用、跨平台、支持多种语言、文档齐全。
- 有消息确认机制和持久化机制,可靠性高。
概念
生产者和消费者
Producer
:消息的生产者Consumer
:消息的消费者
Queue
- 消息队列提供了
FIFO
的处理机制,具有缓存消息的能力。在RabbitMQ
中,队列消息可以设置为持久化,临时或者自动删除。 - 如果是持久化的队列,
Queue
中的消息会在Server
本地硬盘存储一份,防止系统Crash
数据丢失。 - 如果是临时的队列,
Queue
中的数据在系统重启之后就会丢失。 - 如实是自动删除的队列,当不存在用户连接到
Server
,队列中的数据会被自动删除。
ExChange
ExChange类似于数据通信网络中的交换机,提供消息路由策略。
在
RabbitMQ中,生产者不是将消息直接发送给
Queue,而是先发送给
ExChange,
ExChange根据生产者传递的
key按照特定的路由算法将消息给指定的
Queue。一个
ExChange可以绑定多个
Queue。和
Queue一样,
ExChange也可以设置为持久化、临时或者自动删除。
Binding
所谓绑定就是将一个特定的
ExChange和一个特定的
Queue绑定起来。
ExChange和
Queue的绑定可以是多对多的关系。
Virtual Host
在
RabbitMQ Server上可以创建多个虚拟的
Message Broker(又叫做
Virtual Hosts)。每一个
vhost本质上是一个迷你的
RabbitMQ Server,分别管理各自的
ExChange和
binding。生产者和消费者连接
RabbitMQ Server需要指定一个
Virtual Host。
使用过程
- 客户端连接到消息队列服务器,打开一个
Channel
。 - 客户端声明一个
ExChange
,并设置相关属性。 - 客户端声明一个
Queue
,并设置相关属性。 - 客户端使用
Routing Key
,在ExChange
和Queue
之间建立好绑定关系。 - 客户端投递消息到
ExChange
。 ExChange
接收到消息后,就根据消息的key
和已经设置的bingding
,进行消息路由,将消息投递到一个或多个队列里。
部署 RabbitMQ
使用 Docker Compose 部署
创建
docker-compose.yml
version: '3.1' services: rabbitmq: restart: always image: rabbitmq:management container_name: rabbitmq ports: - 5672:5672 - 15672:15672 environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: rabbit RABBITMQ_DEFAULT_PASS: 123456 volumes: - ./data:/var/lib/rabbitmq
RabbitMQ WebUI 界面
-
访问地址:http://{ip}:15672
-
首页
-
Global counts
页 -
交换机页
-
队列页
Name
:消息队列的名称,这里是通过程序创建的 -
Features
:消息队列的类型,durable:true 为会持久化消息 -
Ready
:准备好的消息 -
Unacked
:未确认的消息 -
Total
:全部消息如果都为 0 则说明全部消息处理完成
使用 RabbitMQ
创建生产者
创建一个名为
spring-boot-amqp-provider的生产者项目。
相关配置
-
创建 application.yml 文件
spring: application: name: spring-boot-amqp rabbitmq: host: 192.168.75.133 port: 5672 username: rabbit password: 123456
-
创建队列
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 队列配置 */ @Configuration public class RabbitMQConfiguration { @Bean public Queuequeue() { return new Queue("helloRabbitMQ"); } }
-
创建消息提供者
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 消息提供者 */ @Component public class RabbitMQProvider { @Autowired private AmqpTemplate amqpTemplate; public void send() { String context = "hello" + new Date(); System.out.println("Provider: " + context); amqpTemplate.convertAndSend("helloRabbitMQ", context); } }
发送消息
创建测试用例
import com.lusifer.spring.boot.amqp.Application; import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class AmqpTest { @Autowired private HelloRabbitProvider helloRabbitProvider; @Test public void testSender() { for (int i = 0; i < 10; i++) { RabbitMQProvider.send(); } } }
创建消费者
创建一个名为
spring-boot-amqp-consumer的消费者项目。
相关配置
创建
application.yml文件
spring: application: name: spring-boot-amqp-consumer rabbitmq: host: 192.168.75.133 port: 5672 userna 148e me: rabbit password: 123456
接收消息
创建消息的消费监听组件
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "helloRabbitMQ") public class HelloRabbitConsumer { @RabbitHandler public void process(String message) { System.out.println("Consumer: " + message); } }
文章作者:彭超
本文首发于个人博客:[https://antoniopeng.com/2020/07/18/mq/深入浅出 RabbitMQ/](https://antoniopeng.com/2020/07/18/mq/深入浅出 RabbitMQ/)
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 彭超 | Blog!
相关文章推荐
- 深入浅出Mybatis系列(九)---强大的动态SQL
- Rabbitmq RabConsumer2类
- 深入浅出Attribute (上)——Attribute初体验
- 接口的深入浅出
- 如何安装rabbitmq
- 深入浅出 Java 8 Lambda 表达式
- rabbitmq集群搭载负载均衡Haproxy
- 深入浅出.NET泛型编程
- 深入浅出 Cocoa 多线程编程之 block 与 dispatch quene
- windows安装rabbitmq报错Error: unable to perform an operation on node ''. Please see diagnostics...
- spring与RabbitMQ整合 消费者消费不到消息 重启才能消费到的问题解决
- RabbitMq
- 深入浅出URL编码
- HTTP深入浅出 http请求
- RabbitMQ,ActiveMQ,ZeroMQ,Kafka几种MQ的比较
- 深入浅出异步I/O模型
- RabbitMQ Java官方教程(一)----Hello World
- 转帖: 深入浅出URL编码
- 深入浅出Android makefile(2)--LOCAL_PATH
- RabbitMQ-主题模式Topic