您的位置:首页 > 其它

rabbitmq plugins rabbitmq_delayed_message_exchange消息队列延迟消息插件

2017-09-08 09:43 1731 查看
官方文档:点这里

rabbitmq版本必须是3.5.8以上才支持该插件

使用方法

credentials = pika.PlainCredentials(username='mq', password='654321')
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost', port=5672, virtual_host='/',
credentials=credentials
)
)
channel = connection.channel()
channel.exchange_declare(
exchange='test',
exchange_type='x-delayed-message',
durable=True,
arguments={"x-delayed-type": "fanout"}
)
channel.queue_declare(queue='test', durable=True)
channel.queue_bind(exchange='test', queue='test', routing_key='t')
channel.basic_publish(
routing_key='t', body=json.dumps({'test': 'test'}),
properties=pika.BasicProperties(delivery_mode=2, headers={'x-delay': 8000}),
exchange='test'
)


关键点

创建exchange时指定
exchange_type
x-delayed-message


添加参数,这里指定exchange类型
arguments={"x-delayed-type": "fanout"}


添加消息到队列时添加

properties=pika.BasicProperties(delivery_mode=2, headers={'x-delay': 8000})


delivery_mode 持久化消息

headers={‘x-delay’: 8000} 消息延迟时间毫秒级

和死信的区别

在rabbitmq_delayed_message_exchange插件没出来之前都是使用死信方式来达到延迟队列的效果。

死信是在创建queue是声明这是一个死信队列,里面的消息到一定时间没被消费就会变成死信转发到死信相应的exchange或queue中。

# 死信演示
arguments = {
'x-message-ttl': 1000 * 10,  # 延迟时间 (毫秒)
'x-dead-letter-exchange': exchange,  # 延迟结束后指向交换机(死信收容交换机)
'x-dead-letter-routing-key': queue,  # 延迟结束后指向队列(死信收容队列)
}
channel.queue_declare(queue=delay_queue, durable=True, arguments=arguments)


exchange是没有消费者的概念的,所以,延迟消息是exchange到queue或其它exchange的延迟。

如果消息延迟到期后不能分配到其它的exchange或queue时消息会被丢弃。

比如设置x-delay为10000,要等10秒后才会出现在exchange的routing_key中。

设置了x-delay后这条消息会一直携带该参数,消费者在处理是可以根据该参数判断是不是延迟消息。

延迟消息可以更灵活的设置消息被消费时间。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: