RabbitMQ-限流
2021-09-12 15:59
176 查看
1.简介
为什么要对消费端进行限流?
其实很好理解,比如我们常能接触到的消费场景:春运期间12306火车票的抢购,双11期间的下单等。这些场景都有一个共同点就是都会导致短暂时间内请求数激增,如果我们的
Consumer最多只支持每秒1000的QPS,而由于请求的激增导致每秒2000甚至更多的并发,此时已经远远超过了服务本身所能处理的阈值。如果不对消息进行限流,很可能会将服务拖垮,那将会是灾难性的。实际应用场景不止于这些,接下来通过RabbitMQ来讲解如果对消费端做限流措施。
2. 如何限流
2.1 引入所需依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
2.1 application.yaml
concurrency:并发量,即
Consumer本地起
concurrency个线程去消费。
prefetch:每个线程每次预取几条消息进行消费。
即:
Consumer每次从
Broker取
concurrency * prefetch(unack个数)条消息缓存到本地进行消费。
如果设置unack个数为20,当消费(ack)了4条消息时服务宕机了,那么剩下的16条消息会重新回到Broker中,已确认的消息会从队列中移除掉。
spring: rabbitmq: host: localhost port: 5672 # rabbit 默认的虚拟主机 virtual-host: / # rabbit 用户名密码 username: admin password: admin123 listener: simple: # manual 手动确认 acknowledge-mode: manual # 消费者每次监听消费最小数量 (并发量) concurrency: 3 # 消费者每次监听消费最大数量 (并发量) max-concurrency: 10 # 消费者每次消费的数量(unack 次数:这里感觉用次数会更容易理解) # 即:unacked 数量为 concurrency(最小并发数) * prefetch(可以不确认的次数) = 12(未被接收确认的数量) prefetch: 4
2.2 声明一个简单队列
/** * rabbit 快速开始 * * @author ludangxin * @date 2021/8/23 */ @Configuration public class RabbitSimpleConfig { /** * 设置一个简单的队列 */ @Bean public Queue queue() { return new Queue("helloMQ"); } }
2.3 producer
/** * 生产者 * * @author ludangxin * @date 2021/8/23 */ @Component public class SimpleProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "helloMQ " + System.currentTimeMillis(); rabbitTemplate.convertAndSend("helloMQ", context); } }
2.4 consumer
/** * 消费者 * * @author ludangxin * @date 2021/8/23 */ @Slf4j @Component @RabbitListener(queues = {"helloMQ"}) public class SimpleConsumer { @RabbitHandler public void process(String hello) throws InterruptedException { log.info("Message:{} ", hello); // 设置睡眠时间,方便通过日志信息分析问题 Thread.sleep(3000); } }
2.5 测试代码
@Autowired private SimpleProducer simpleSender; @Test public void hello() throws Exception { for (int i = 0; i < 100; i++) { simpleSender.send(); } // 阻塞进程,使消费者能够正常监听消费。 System.in.read(); }
我们首先分析一下:
我们在测试的时候发送了
100条消息,在项目配置的时候
unack个数设置了3 * 4 = 12,也就是说我们一次只从
Broker拉取12条信息进行消费。
当消费者进行消费的时候我们设置了3秒的延迟,还有很重要一点,我们在消费的时候没有进行ack。也就是说,当我们消费完12条信息后,并没有进行ack,会导致
Consumer并不会从
Broker继续拉取消息,另一方面也能说明,
Consumer确实只从
Broker拉取了12条消息。
输出日志如下,每三秒输出三条信息,输出完12条后再没有输出日志,证实了猜想。
相关文章推荐
- RabbitMQ实现削峰限流demo
- RabbitMQ消费端限流策略(十)
- RabbitMQ高级之消息限流与延时队列
- SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)
- rabbitmq实战系列4--springboot整合rabbitmq实现限流与并发
- RabbitMQ 消费端限流、TTL、死信队列
- RabbitMQ 消费端限流、TTL、死信队列
- RabbitMQ有关限流QOS的理解
- 限流-------------令牌桶算法
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- RabbitMQ之消息持久化
- Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题
- RabbitMQ学习二
- RabbitMQ之惰性队列(Lazy Queue)
- 原创 | SpringBoot监听rabbitmq和创建交换器,队列
- rabbitmq 小记
- RabbitMQ系列—RabbitMQ介绍
- CentOS7.3安装Docker及rabbitmq
- 柯南君:看大数据时代下的IT架构(8)消息队列之RabbitMQ--案例(topic起航)
- RabbitMQ应用实现