您的位置:首页 > 其它

消息队列Rabbit的使用及一些问题的处理

2019-07-04 15:53 627 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_44800330/article/details/94596963

1.Rabbit是神马

  Rabbit是一个在微服务模块中,模块与模块之间进行相互通信,相互调用的一个消息中间件。

2.为什么要使用RabbitMQ

解耦:

       我们在做微服务开发中,会涉及到多个模块之间的相互调用。我们以ABCD四个独立运行的模块举个例子。假设现在A系统产生了一条数据,BCD三个系统都需要,这个时候,又新加入了E系统,E系统也需要这个数据,这个时候,C系统又不需要这个数据了。这样一来,对开发和维护A系统的人员是一种折磨啊!!这时候,就采用了RabbitMQ消息队列来解决。A系统产生一条数据,发送到MQ里面去,哪个系统需要数据自己去MQ里面消费。如果新系统需要数据,直接从MQ里面消费即可;如果某个系统不需要这条数据了,就取消对MQ消息的消费即可。这样下来,A系统就不要考虑那些谁要,谁不要的问题了。这就是MQ的第一个好处:解耦

       总结:通过一个MQ,Pub/Sub发布订阅消息这么一个模型,A系统就跟其他系统彻底解耦了。

异步:

       第二个场景:A系统接收一条请求,需要在自己本地写库,还需要在BCD三个系统写库,自己本地写库要3 ms,BCD三个系统分别写库要300ms、450ms、200ms,最终的请求就是953ms,这样也好请求一个东西,会感觉很慢。

       如果使用MQ,那么A系统连续发送3条消息到MQ队列中,假如耗时5ms,A系统从接受一个请求到返回响应给用户花费8ms,对用户的体验大大提升。

削峰:

       假设在一个时间段内,A系统按照正常的效率处理请求(假设是每秒并发请求数量50个),结果在一个时间段内,出现了访问的高峰期,每秒并发请求数量突然暴增到5k+。但是呢,A系统是基于Mysql的,你想想,这么多请求去访问Mysql,一般的Mysql,每秒并发请求数量最多2k,结果就是,Mysql崩溃了,系统也无法正常运行了,用户也没法再使用A系统了。

       如果使用MQ队列,每秒5k个请求写入MQ队列中,A系统每秒钟最多处理2k个请求,因为Mysql每秒钟最多处理2k个。A系统按照这个处理效率慢慢从消息队列中拉取消息进行处理,每秒钟2k个请求,不要超过自己每秒能处理的最大请求数量就好。这样就可能导致在高峰期的时间内,会有几十万甚至几百万的请求挤压在MQ中。

      当高峰期一过,A系统按照每秒2k个请求处理效率,很快就可以解决挤压在MQ中的消息。

任何事物,有利就有弊。

缺点:

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用?
  • 系统复杂度提高
    硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

下面就针对性的根据MQ的缺点做处理和优化!!

3.如何保证消息队列的高可用

   在上面我们看到了消息队列为什么不能保证高可用,现在我们来解决。

    RabbitMq是基于做主从做高可用的。在rabbitMQ的主从模式当中,有三种模式:单机模式、普通集群模式、镜像集群模式。

 1) 单机模式

     这种模式就是我们平时自己用来玩的。在实际开发中,不会食欲这种模式。

 2) 普通集群模式

      在普通集群模式中,在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是你创建的queue队列(每一个rabbitMQ实例中,都会有多个queue队列),只会放在一个rabbitmq实例上,但是每个实例都同步queue的元数据。完了你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。

       这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个queue所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

       而且如果那个放queue的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让rabbitmq落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个queue拉取数据。

       所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性可言了,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。

 3) 镜像集群模式

       这种模式,才是所谓的rabbitmq的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。

       这样的话,好处在于,你任何一个机器宕机了,没事儿,别的机器都可以用。坏处在于,第一,这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和消耗很重!第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue。

       那么怎么开启这个镜像集群模式呢?我这里简单说一下,避免面试人家问你你不知道,其实很简单rabbitmq有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

4.如何保证消息的可靠性传输(如果消息丢失如何处理)

   消息为什么会丢失?从哪里丢失?

   消息在传输的过程中,由于网络故障,或者系统异常等原因都会导致消息丢失。

   丢失主要从三个地方丢失:1.生产者弄丢了数据;2.RabbitMQ弄丢了数据;3.消费端弄丢了数据

   1.生产者弄丢了数据

 生产者将数据发送到RabbitMQ的过程中,数据在半路就丢失了。由于网络原因。

  解决:两种方案

     1) 开启RabbitMQ提供的事务功能。就是生产者发送数据之前开启RabbitMQ事务,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务,然后重试发送消息;如果收到了消息,那么可以提交事务。

     2) 开启confirm模式。在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMq中,RabbitMQ会给你回传一个ack消息,告诉你这个消息已经收到。如果RabbitMQ没有接收到这个消息,就会回调你一个那抽空接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没有收到这个消息的回调(也就是没有收到ack消息),那么生产者可以尝试重新发送消息。

总结:事务机制与confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,只有当前提交的消息被RabbitMQ收到,并回传一个ack消息之后,下一条消息才会继续存入RabbitMQ;但是confirm机制是异步的,你发送这个消息之后就可以发送下一条消息,然后上一条消息被RabbitMQ接收到了,会异步的回调你一个借口通知你消息收到了。

2.RabbitMQ弄丢了数据

解决:开启RabbitMQ的持久化

  RabbitMQ在接收消息的时候,对消息进行持久化,即使是RabbitMQ宕机了,等回复之后,就可以去磁盘当中恢复。

  设置持久化有两个步骤:第一就是在创建queue的时候,将其设置为持久化的,这样可以保证RabbitMQ持久化queue中的元数据(所谓元数据,就是queue里面的一些配置信息),但是不会持久化queue里的数据。

    第二就是发送消息的时候,将消息的deliveryMode()设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘中。

   最后,可以结合生产者的confirm机制,只有消息被持久化到磁盘中之后,才会通过生产者ack消息,否则,在持久化之前发生故障,都可以让生产者重新发送。

3.消费端弄丢了数据

解决:采用RabbitMQ通过的ack机制。

    所谓消费者丢失消息,就是消费者在刚获取到这条消息,还没对消息中的内容做分析和处理,这个时候,消费端宕机了,等恢复过来的时候,消息已经不存在了,就导致了消息丢失。

    这个时候,我们首先关闭RabbitMQ中的自动ack机制(通过一个API接口调用即可),然后在自己写的代码里,添加ack机制,就是只有当消费端获取到消息并且成功处理了这个消息,才会给RabbitMQ一个ack消息,所以,即使你宕机了,RabbitMQ没有收到ack消息,就会把这个消息分配给其他的consumer去处理,消息是不会丢失的。

5.如何保证消息的顺序性

  主要思路有两种:1、单线程消费来保证消息的顺序性;2、消费者处理时根据编号判断顺序。

   此图来源于一篇博客:消息队列保证顺序性

 

       如图,data1和data2 是有顺序的,必须data1先执行,data2后执行;这两个数据被不同的消费者消费到了,可能data2先执行,data2后执行,这样运来的顺序就错乱了(比如说,生产者同时发送了两条消息data1和data2,data1是先进行对数据库的访问,做一些判断用的,data2是用来更新数据的,必须是先进行判断之后,才决定要不要更新数据)

      解决:在MQ里面创建多个queue,同一个规则的数据(假如都是进行更新数据库的),对唯一标识进行hash(这个唯一标识可以是id,可以是数字,可以是其他数组类型的),有顺序的放入MQ的queue里面,消费者只取一个queue里面的数据进行消费,这样执行的顺序是有序的。或者只有一个queue,对应一个消费者,在这个消费者内部用内存队列做排队,分给底层不同的worker来处理。

6.如何保证消息不被重复消费(如何保证消息消费时的幂特性)

一条数据重复出现多次,数据库里就只有一条数据,这就保证了消息的消费幂特性。

其实要保证消息不被重复消费,还是得根据自己的业务来处理。

  这里给一些思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

   原文地址:https://www.javazhiyin.com/22910.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐