rabbitmq 延迟队列的实现(PHP)http://blog.yuhai.win
2017-07-07 14:09
796 查看
更多内容: http://blog.yuhai.win
参考文献: 1、https://wenku.baidu.com/view/0108202e3b3567ec102d8ab0.html
实现原理: 1、rabbitmq 可以针对 Queue和Message 设置 x-message-ttl 来控制消息的生存时间,如果超时,消息变为 dead letter 2、rabbitmq 的queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing(可选) 两个参数,来控制队列出现 dead letter 的时候,重新发送消息的目的地 注意事项: 1、设置了 x-dead-letter-exchange 和 x-dead-letter-routing 后的队列是根据 队列入队的顺序进行消费,即使到了过期时间也不会触发x-dead-letter-exchange 因为过期时间是在消息出队列的时候进行判断的 2、所以当队列没有设过期时间时,插入一个没有过期时间的消息会导致 x-dead-letter-exchange 队列永远不会被消费
方案一: 对message设置TTL
1、对queue设置最大过期时间 2、对发送的每个Message 设置过期时间
优点 1、发送时可以自定义延迟时间 缺点 1、需要升级客户端,对客户端不透明 2、需要针对建立不同的延迟每个队列队列 带延迟参数的send方法容易误用,很难发现 存在问题: 如果插入一个过期时间很长的队列可能会照成队列堵住,过期了的消息不能被消费。
send.php
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('delay_exchange', 'direct',false,false,false); $channel->exchange_declare('cache_exchange', 'direct',false,false,false); $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', 'delay_exchange'); $tale->set('x-dead-letter-routing-key','delay_exchange'); $tale->set('x-message-ttl',10000); $channel->queue_declare('cache_queue',false,true,false,false,false,$tale); $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange'); $channel->queue_declare('delay_queue',false,true,false,false,false); $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange'); $msg = new AMQPMessage('Hello World'.$argv[1],array( 'expiration' => intval($argv[1]), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); $channel->basic_publish($msg,'cache_exchange','cache_exchange'); echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL; $channel->close(); $connection->close();
receive.php
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('delay_exchange', 'direct',false,false,false); $channel->exchange_declare('cache_exchange', 'direct',false,false,false); $channel->queue_declare('delay_queue',false,true,false,false,false); $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange'); echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL; $callback = function ($msg){ echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //只有consumer已经处理并确认了上一条message时queue才分派新的message给它 $channel->basic_qos(null, 1, null); $channel->basic_consume('delay_queue','',false,false,false,false,$callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
方案二 对queque 设置过期时间
优点: 1、维护简单 2、客户端完全透明 3、针对建立一个延迟队每个延迟时间列 缺点: 1、发送方无法自定义延迟时间 2、延迟时间在建Queue时确定,修改不便 修改延迟时间需要在MQ集群重新进行配置
send.php
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); if (isset($argv[1])){ $expiration = intval($argv[1]); }else{ $expiration = 1000; } $cache_exchange_name = 'cache_exchange'.$expiration; $cache_queue_name = 'cache_queue'.$expiration; $channel->exchange_declare('delay_exchange', 'direct',false,false,false); $channel->exchange_declare($cache_exchange_name, 'direct',false,false,false); $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', 'delay_exchange'); $tale->set('x-dead-letter-routing-key','delay_exchange'); $tale->set('x-message-ttl',$expiration); $channel->queue_declare($cache_queue_name,false,true,false,false,false,$tale); $channel->queue_bind($cache_queue_name, $cache_exchange_name,''); $channel->queue_declare('delay_queue',false,true,false,false,false); $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange'); $msg = new AMQPMessage('Hello World'.$argv[1],array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, )); $channel->basic_publish($msg,$cache_exchange_name,''); echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL; $channel->close(); $connection->close();
receive.php
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('delay_exchange', 'direct',false,false,false); $channel->exchange_declare('cache_exchange', 'direct',false,false,false); $channel->queue_declare('delay_queue',false,true,false,false,false); $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange'); echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL; $callback = function ($msg){ echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //只有consumer已经处理并确认了上一条message时queue才分派新的message给它 $channel->basic_qos(null, 1, null); $channel->basic_consume('delay_queue','',false,false,false,false,$callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
相关文章推荐
- 基于PHP使用rabbitmq实现消息队列
- Spring集成RabbitMQ并实现延迟队列
- 如何才能让Spring Boot与RabbitMQ结合实现延迟队列
- 基于PHP使用rabbitmq实现消息队列
- 基于PHP使用rabbitmq实现消息队列
- RabbitMQ如何实现延迟队列?
- Spring Boot与RabbitMQ结合实现延迟队列的示例
- 基于 rabbitmq 实现延迟队列
- php rabbitmq延迟队列示例
- C#实现rabbitmq 延迟队列功能
- Rabbitmq延迟队列实现定时任务的方法
- rabbitmq 实现延迟队列(ttl+DLX)
- C#实现rabbitmq 延迟队列功能
- C#实现rabbitmq 延迟队列功能
- Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列
- RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知
- Spring MVC整合RabbitMQ实现延迟队列
- C#实现rabbitmq 延迟队列功能实例代码
- dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(十二)之 spring中RabbitMQ延迟队列的实现
- RabbitMQ如何实现延迟队列?