您的位置:首页 > 产品设计 > UI/UE

Rabbitmq 学习笔记(二)队列

2016-07-11 11:54 609 查看

队列

队列(queue)就是在生产者和消费者之间传递消息的对象:
具有以下特性:
- 同一个队列可以有多个生产者和多个消费者;
- 队列的创建具有幂等性,同一个名称的队列只会创建一次;
- 队列中的消息只会被其中一个消费者读取并处理,也就是说接收完之后就会被删除;
- 多个消费者轮流读取队列中的消息;
- 如果没有消费者,消息会保存在队列中;


代码演示,普通队列

演示环境介绍:centos 7, python 2.7, 使用rabbitmq的python版client库,pika。
想运行示例代码,需要事先安装rabbitmq和python环境。后面不再复述。


1.receiver.py 消费者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))             #我的环境中rabbitmq和示例代码在同一台服务器
channel = connection.channel()

channel.queue_declare(queue='hello')   #创建队列

def callback(ch, method, properties, body):     #消费者接受消息后处理回调函数
print(" [x] Received %r" % body)

channel.basic_consume(callback,
queue='hello',
no_ack=True)             #接收消息函数参数定义

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


1.sender.py 生产者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()


当没有消费者时,消息会保留在队列中。



消费者上线后,会读取队列中的消息。



提问: 如果队列中有没有被读取的消息时,rabbitmq服务重启,消息会丢吗?

答案:消息会丢失。



代码演示,消息可靠性,防饿死机制

场景1: 如果消费者在接收消息过程中,出现意外终止的情况(网络不通、进程被杀死等),
队列中的消息会丢失,怎么处理?
消息可靠性通过 `no_ack` 参数设置:
true:表示不管消费者是否接收完整消息,都会丢失消息。
false:需要等消息确认消息接收完整,才会删除队列中的消息缓存。
默认值:false

场景2: 默认情况下,消费者会预读取队列中所有已经存在的消息,新加入的消费者会读取不到队列中这些消息。
也就是新加入的消费者被饿死了。怎么处理?
防饿死机制通过 basic_qos(prefetch_count=3) 函数设置预读取消息数量。

场景3:队列可靠性
durable=True 机制 ,持久化队列,重启服务也不会丢失


2.sender.py 生产者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()


2.receiver.py 消费者

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))      #让每条消息的接收时间变长一些,方便测试
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)  #告诉队列接收消息完成

channel.basic_qos(prefetch_count=3)   #预读取3条消息
channel.basic_consume(callback,
queue='task_queue')

channel.start_consuming()


消费者接收完消息之后,打印done,再接收下一条消息。

第一个消费者预读取3条消息,第二个消费者从第4条消息开始接收。

后面消息轮流读取。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq queue