RabbitMQ如何应对Server出现异常以及server分发任务的负载均衡问题
2015-10-25 00:00
495 查看
摘要: 这篇文章主要介绍在RabbitMQServer出错时,应该如何避免task丢失
上一篇文章结尾我留了一个问题,就是在work2中设no_ack=True
出现那样的结果是因为server一次分发多个结果给work2,work2又在执行完了以后没有发送ack确认执行结束,server端是根据connection是否存在来判断work2是否stoped,由于连接存在,所以server认为work2正在执行,其实此时work2已经执行结束,正在等待server发送消息。。。
一:
task是被加载到内存中的,要避免server崩溃导致的task丢失,当然想到的办法就是持久化,将task保存到硬盘上
在定义Queue的时候,设置属性durable=True
与此同时需要告知生产者,queue是被持久化在硬盘上的
二:
如何解决server一次分发多个任务的情况,也可以通过设置属性
new_task.py:
work代码:
work2.py
3运行结果
可以看出server对任务的分发已经变得随机,而不是原来的依次分发
上一篇文章结尾我留了一个问题,就是在work2中设no_ack=True
出现那样的结果是因为server一次分发多个结果给work2,work2又在执行完了以后没有发送ack确认执行结束,server端是根据connection是否存在来判断work2是否stoped,由于连接存在,所以server认为work2正在执行,其实此时work2已经执行结束,正在等待server发送消息。。。
一:
task是被加载到内存中的,要避免server崩溃导致的task丢失,当然想到的办法就是持久化,将task保存到硬盘上
在定义Queue的时候,设置属性durable=True
channel.queue_declare(=,=)
与此同时需要告知生产者,queue是被持久化在硬盘上的
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
二:
如何解决server一次分发多个任务的情况,也可以通过设置属性
channel.basic_qos(prefetch_count=1)
new_task.py:
# -*- coding: UTF-8 -*- import pika if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="Kadima", durable=True) message = "You are awsome!" for i in range(0, 100): # 循环100次发送消息 channel.basic_publish(exchange="", routing_key='Kadima', body=message + " " + str(i), properties=pika.BasicProperties(delivery_mode=2)) print "sending ", message
work代码:
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): # <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic. # Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50 # ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y # ou are awsome! 98 # temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量” # global var # var+=1 # if var==20: # print var , body # sys.exit() print "1 received %r" % (body,) # time.sleep(0.1) print "Done" #设置返回ack的标志,method.delivery_tag是MQ分发给Work时的一个标记 ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.basic_qos(prefetch_count=1) channel.queue_declare(queue="Kadima",durable=True) channel.basic_consume(callback,queue="Kadima") print ' [1] Waiting for messages' channel.start_consuming()
work2.py
import time import pika __author__ = 'Yue' def callback(ch, method, properties, body): print "2 received %r" % (body,) # time.sleep(0.1) print "Done" ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.basic_qos(prefetch_count=1) channel.queue_declare(queue="Kadima",durable=True) channel.basic_consume(callback,queue="Kadima") print ' [2] Waiting for messages' channel.start_consuming()
3运行结果
可以看出server对任务的分发已经变得随机,而不是原来的依次分发
相关文章推荐
- RabbitMq中的exchange是什么
- 编写不受魔术引号影响的php应用
- 面向对象类-类
- 正则表达式
- Mint17/Ubuntu14.04 samba文件服务器搭建
- 解决PhpStorm不能自动提示父类的方法的问题
- HTTP的请求类型GET,PUT,POST
- MyBatis整合Spring的实现(2)
- MyBatis整合Spring的实现(3)
- MyBatis整合Spring的实现(4)
- MyBatis整合Spring的实现(5)
- MyBatis整合Spring的实现(6)
- Swift引入Header.h文件
- OC17内存管理和自动引用计数
- JAVA动态代理的理解
- 创业的第五十七天
- JavaScript相等操作符
- JavaScript 控制结构
- Ember.js 入门指南——自定义序列化器
- CoreData 持久化数据存储的注意点