Rabbitmq 学习笔记(二)队列
2016-07-11 11:54
609 查看
队列
队列(queue)就是在生产者和消费者之间传递消息的对象: 具有以下特性: - 同一个队列可以有多个生产者和多个消费者; - 队列的创建具有幂等性,同一个名称的队列只会创建一次; - 队列中的消息只会被其中一个消费者读取并处理,也就是说接收完之后就会被删除; - 多个消费者轮流读取队列中的消息; - 如果没有消费者,消息会保存在队列中;
代码演示,普通队列
演示环境介绍:centos 7, python 2.7, 使用rabbitmq的python版client库,pika。 想运行示例代码,需要事先安装rabbitmq和python环境。后面不再复述。
1.receiver.py 消费者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) #我的环境中rabbitmq和示例代码在同一台服务器 channel = connection.channel() channel.queue_declare(queue='hello') #创建队列 def callback(ch, method, properties, body): #消费者接受消息后处理回调函数 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) #接收消息函数参数定义 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
1.sender.py 生产者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
当没有消费者时,消息会保留在队列中。
消费者上线后,会读取队列中的消息。
提问: 如果队列中有没有被读取的消息时,rabbitmq服务重启,消息会丢吗?
答案:消息会丢失。
代码演示,消息可靠性,防饿死机制
场景1: 如果消费者在接收消息过程中,出现意外终止的情况(网络不通、进程被杀死等), 队列中的消息会丢失,怎么处理? 消息可靠性通过 `no_ack` 参数设置: true:表示不管消费者是否接收完整消息,都会丢失消息。 false:需要等消息确认消息接收完整,才会删除队列中的消息缓存。 默认值:false 场景2: 默认情况下,消费者会预读取队列中所有已经存在的消息,新加入的消费者会读取不到队列中这些消息。 也就是新加入的消费者被饿死了。怎么处理? 防饿死机制通过 basic_qos(prefetch_count=3) 函数设置预读取消息数量。 场景3:队列可靠性 durable=True 机制 ,持久化队列,重启服务也不会丢失
2.sender.py 生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
2.receiver.py 消费者
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) #让每条消息的接收时间变长一些,方便测试 print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) #告诉队列接收消息完成 channel.basic_qos(prefetch_count=3) #预读取3条消息 channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
消费者接收完消息之后,打印done,再接收下一条消息。
第一个消费者预读取3条消息,第二个消费者从第4条消息开始接收。
后面消息轮流读取。
相关文章推荐
- Rabbitmq集群搭建笔记
- C#队列Queue用法实例分析
- 用PHP写的基于Memcache的Queue实现代码
- C#队列Queue多线程用法实例
- linux中编写自己的并发队列类(Queue 并发阻塞队列)
- vector,map,list,queue的区别详细解析
- Laravel 4.2 中队列服务(queue)使用感受
- jQuery中队列queue()函数的实例教程
- Python通过RabbitMQ服务器实现交换机功能的实例教程
- Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程
- Python Queue模块详解
- 利用Python学习RabbitMQ消息队列
- Python多进程通信Queue、Pipe、Value、Array实例
- python使用Queue在多个子进程间交换数据的方法
- 详解Python操作RabbitMQ服务器消息队列的远程结果返回
- python使用rabbitmq实现网络爬虫示例
- Python操作RabbitMQ服务器实现消息队列的路由功能
- HAZELCAST 客户端命令 可用于简单调试
- Linux下PHP扩展amqp安装
- EJB3.0 JBoss的JMS实例