您的位置:首页 > 编程语言 > Python开发

python使用amqp的例子

2012-06-11 12:37 621 查看
web程序需要给IPHONE手机推送消息,移动端同事写好了一个LIB,但是这个LIB使用的是阻塞IO,在APPLE服务器返回前程序是阻塞的,他们用了多线程来解决这个问题,但是webpy运行在apache里,无法进行线程管理,所以就迫切需要一个异步的机制来解决这个问题,希望做到需要发送消息时。调用一个函数,把数据扔进管道或者消息队列后就立即返回,也不管数据是否真的出去了,它相当于生产者,再有一个程序从管道或者队列中读取,进行实际的发送,相当于消费者

但是python似乎只封装了IPC(UNIX/LINUX平台进程通信规范)的匿名管道,命名管道的API,并无消息队列,而且管道似乎不易操作,因此想使用第三方的消息队列工具,那么我老大进行了技术选型,最终选择了amqp,这个消息队列组件使用erlang编写,启动了一个socket服务器,程序通过socket进行入队和出队的操作,不过这个组件应该是提供了大量的库隐藏了通信的部分,使代码看起来就像调用函数进行队列操作

client.py 测试客户端 向队列写入消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#client.py

import sys
import time

import json
from amqplib import client_0_8 as amqp

conn = amqp.Connection(
host="localhost:5672",
userid="guest",
password="guest",
virtual_host="/",
insist=False)
chan = conn.channel()

i = 0
while 1:
#msg = amqp.Message('Message %d' % i)

#笔记更新
'''
data = {
"noteId" : 1,
"recvUserId" : 2,
"title" : "test",
"updateUserName":"你好",
"remindCount":10,
"projectName":"11",
"token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
"isPushed":0,
"messageType":"NoteUpdated",
}
'''

'''
data = {
"projectId": 1,
"recvUserId": 2,
"reqUserName": "testUser",
"reqUserStatus": "pending",
"remindCount": 10,
"projectName":"11",
"token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
"messageType":"Apply",
}
'''

data = {
"projectId": 1,
"recvUserId": 2,
"reqUserName": "testUser",
"reqUserStatus": "",
"remindCount": 10,
"projectName": "11",
"token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
"messageType":"Review",
}

s = json.dumps(data,ensure_ascii=False)
print s
msg = amqp.Message(s)

msg.properties["delivery_mode"] = 2

chan.basic_publish(msg,
exchange="sorting_room",
routing_key="testkey")
i += 1
time.sleep(1)
break

chan.close()
conn.close()


server.py 读取队列

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from amqplib import client_0_8 as amqp
import process

conn = amqp.Connection(
host="localhost:5672",
userid="guest",
password="guest",
virtual_host="/",
insist=False)
chan = conn.channel()

chan.queue_declare(
queue="po_box",
durable=True,
exclusive=False,
auto_delete=False)
chan.exchange_declare(
exchange="sorting_room",
type="direct",
durable=True,
auto_delete=False,)

chan.queue_bind(
queue="po_box",
exchange="sorting_room",
routing_key="testkey")

def recv_callback(msg):
#TODO
#print msg.body
process.starup(msg.body)

chan.basic_consume(
queue='po_box',
no_ack=True,
callback=recv_callback,
consumer_tag="testtag")

while True:
chan.wait()

#chan.basic_cancel("testtag")
#chan.close()
#conn.close()


process.py 业务逻辑模块

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import time
import thread
from APNSWrapper import *

#---------------------------------------------------------------------------------------------
'''从外部的数据转换成为标准数据'''
def convert(args):
print args
data = json.loads(args)
return data

'''线程入口'''
def threadMain(args):
data = convert(args)
msg = messagesFactory(data)
msg.fillMessages(data)
msg.push()

#---------------------------------------------------------------------------------------------
'''业务入口'''
def startup(args):
thread.start_new_thread(threadMain,(args,))

#---------------------------------------------------------------------------------------------
'''消息工厂'''
def messagesFactory(data):
if data["messageType"] == "NoteUpdated":
return NoteUpdateMessages()

if data["messageType"] == "Apply":
return ApplyMessages()

if data["messageType"] == "Review":
return ReviewMessages()

#---------------------------------------------------------------------------------------------
'''消息基类'''
class Messages:
def __init__(self):
self.Messages = []

def makeMessage(self,data):
pass

def fillMessages(self,data):
message = self.makeMessage(data)
self.Messages.append(message)

def push(self):
try:
wrapper = APNSNotificationWrapper('/etc/ck.pem', True)

for message in self.Messages:
wrapper.append(message)

wrapper.notify()
print 'send success'
except Exception, e:
raise

#---------------------------------------------------------------------------------------------
'''笔记更新消息类'''
class NoteUpdateMessages(Messages):
def makeMessage(self,data):
token = data["token"]
deviceToken = token.decode('hex')
message = APNSNotification()
message.token(deviceToken)
for key in ['updateUserName','title']:
if type(data[key]) is unicode:
data[key] = data[key].encode('utf8')
alert = '共%d篇更新 %s编辑了“%s”' % (int(data["remindCount"]), data["updateUserName"], data["title"])

if data["isPushed"] == 0:
message.alert(alert)
message.badge(int(data["remindCount"]))
message.sound('qing.caf')

property1 = APNSProperty("NOTE_ID", int(data["noteId"]) )
message.appendProperty(property1)
property2 = APNSProperty("USER_ID", int(data["recvUserId"]) )
message.appendProperty(property2)
property3 = APNSProperty("REMIND_TIME", int(time.time()) )
message.appendProperty(property3)
message.appendProperty(APNSProperty("REMIND_TYPE", 1 ))
return message

#---------------------------------------------------------------------------------------------
'''申请加入项目消息类'''
class ApplyMessages(Messages):
def makeMessage(self,data):
token = data["token"]
deviceToken = token.decode('hex')
message = APNSNotification()
message.token(deviceToken)
alertMsg = ''
status = data["reqUserStatus"]
if status == 'pending':
alertMsg = '%s申请加入%s群组,请审批' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
elif status == 'active':
alertMsg = '%s已加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
elif status == 'deny':
alertMsg = '%s被拒绝加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
elif status == 'removed':
alertMsg = '%s已从%s群组移除' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)

message.alert(alertMsg)
message.badge(int(data["remindCount"]))
message.sound('qing.caf')

message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))
message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))
message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))
message.appendProperty(APNSProperty("REMIND_TYPE", 3 ))
message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))
return message

#---------------------------------------------------------------------------------------------
'''审核结果消息类'''
class ReviewMessages(Messages):
def makeMessage(self,data):
token = data["token"]
deviceToken = token.decode('hex')

message = APNSNotification()
message.token(deviceToken)

alertMsg = ''
status = data["reqUserStatus"]
if status == 'active':
alertMsg = '您已加入%s群组' % (data["projectName"].encode('utf-8'),)
elif status == 'deny':
alertMsg = '您被拒绝加入%s群组' % (data["projectName"].encode('utf-8'),)
elif status == 'removed':
alertMsg = '您已从%s群组移除' % (data["projectName"].encode('utf-8'),)

message.alert(alertMsg)
message.badge(int(data["remindCount"]))
message.sound('qing.caf')

message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))
message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))
message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))
message.appendProperty(APNSProperty("REMIND_TYPE", 2 ))
message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))
return message
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: