项目学习第五天 RabbitMQ消息中间件
项目学习第五天 RabbitMQ消息中间件
RabbitMQ简介:
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ 特点:
1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
4.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
典型应用场景:
异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
日志处理
应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
AMQP协议中间的几个重要概念:
Server:接收客户端的连接,实现AMQP实体服务。
Connection:连接,应用程序与Server的网络连接,TCP连接。
Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
Queue:消息队列,用来保存消息,供消费者消费。
docker环境下的安装
此项目为微服务架构项目,将所有服务均制作成容器,故在此介绍docker环境下的安装 。
下载镜像:
docker pull rabbitmq:management
创建容器:
rabbitmq需要有映射以下端口: 5671 5672 4369 15671 15672 25672
15672 (if management plugin is enabled)
15671 management监听端口
5672, 5671 (AMQP 0-9-1 without and with TLS)
4369 (epmd) epmd 代表 Erlang 端口映射守护进程
25672 (Erlang distribution)
docker run ‐di ‐‐name=tensquare_rabbitmq ‐p 5671:5617 ‐p 5672:5672 ‐p 4369:4369 ‐p 15671:15671 ‐p 15672:15672 ‐p 25672:25672 rabbitmq:management
浏览器访问 http://192.168.x.x.:15672/#/
地址为docker地址,端口号为:15672,输入用户名和密码,都为guest
代码实现-消息生产者
创建工程rabbitmq_demo,引入amqp起步依赖 ,pom.xml
<groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐parent</artifactId> <version>2.0.1.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐test</artifactId> <scope>test</scope> </dependency> </dependencies>
编写配置文件application.yml
spring: rabbitmq: host: 192.168.184.134
编写启动类
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class); } }
编写测试类
@RunWith(SpringRunner.class) @SpringBootTest(classes=Application.class) public class MqTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend(){ rabbitTemplate.convertAndSend("shenhaiyu","学习项目"); } }
代码实现-消息消费者
编写消息消费者类
@Component @RabbitListener(queues="shenhaiyu" ) public class Customer1 { @RabbitHandler public void showMessage(String message){ System.out.println("shenhaiyu接收到消息:"+message); } }
运行启动类,可以在控制台看到刚才发送的消息
创建队列与绑定
在RabbitMQ客户端,新建一个交换器 ,类型选择topic,点击新建的交换器,填写To queue以及Routing Key,添加匹配规则。
编写测试类方法:
@Test public void testSendTopic1(){ rabbitTemplate.convertAndSend("topictest","goods.aaa","主题模式"); }
输出结果:接收到消息:主题模式
几个经典RabbitMQ面试问题
1、使用RabbitMQ有什么好处?解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,比较繁琐。
异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度,增加效率。
削峰,当并发量大的时候,防止所有的请求直接访问数据库,造成数据库异常或宕机。
2、RabbitMQ 中的 broker 是指什么?cluster 又是指什么?broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。
3、RabbitMQ会丢数据嘛,如何解决?1.生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2.消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3.消费者丢数据
启用手动确认模式可以解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
③不确认模式,acknowledge=“none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
3号深海鱼,强势悍跳预言家 原创文章 7获赞 4访问量 1464 关注 私信- 十次方项目第五天(消息中间件RabbitMQ)
- SpringBoot整合RabbitMQ消息中间件(菜鸟学习)
- SpringCloud入门学习笔记7--消息中间件RabbitMQ
- RabbitMQ消息中间件学习资料收录
- 【消息中间件】Kafka、RabbitMQ、RocketMQ等消息中间件 学习笔记
- 【消息中间件之RabbitMQ学习】-开篇-001
- 消息中间件RabbitMQ 的学习
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- Kafka、RabbitMQ、RocketMQ消息中间件的对比
- rabbitmq学习9:使用spring-amqp发送消息及同步接受消息
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- RabbitMQ五种消息队列学习(三)--Work模式
- 17.12.25,web学习第三十二天,还有一年,努力吧青年 商城项目第五天 我的订单回显和文件上传
- 消息中间件 activeMQ 及 JMS 学习
- 消息中间件RabbitMQ(一)
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- 深入理解消息中间件技术之RabbitMQ服务
- RabbitMQ学习(十二)之spring整合发送异步消息(注解实现)
- 消息中间件——RabbitMQ(八)高级特性全在这里!(下)
- 【中间件】消息队列(一):RabbitMQ、ActiveMQ、Kafka和Redis