您的位置:首页 > 数据库 > Redis

redis(4)、基于redis 构建异步消息系统

2016-07-15 00:00 489 查看
一般消息队列有两种场景

生产者消费者模式 :多个生产者生产消息放在消息队列里,多个消费者同时监听消息队列,谁先抢到消息,谁先处理。每个消息只能被消费一次。

发布者订阅者模式:发布者发布消息到消息队列里,多个监听者同时监听该消息队列,都会同时收到同一份消息。即每个消息被每个监听者消费一次。

一、构建生产者消费者模式

(1)构建生产者消费者模式,可以使用list去实现

主要使用LPUSH或者RPUSH插入数据,使用LPOP或者RPOP取出数据并且删除。



生产者伪代码为:

string key = "like_list";
string msg = "user_1";
redis.lpush(key,msg);

消费者伪代码为:

string key = "like_list";
while(true){
string msg = redis.brpop(BLOCK_TIMEOUT, key );
if (msg == null) continue;
processMsg(msg );
}

(2)构建具有优先级的生产者消费者模式,使用sorted sets实现

使用ZADD插入带有SCORE的数据,也就是优先级参数。使用ZREVRANGE 取出数据,并且zrem删除数据,只有在取出成功,并且删除成功的情况下,认为该消息被正确取出。

sorted sets 没有像list中的pop命令一样,取出并删除。

二、构建发布者订阅者模式

使用PUBSUB构建发布者订阅者模式

PUBLISH channel message 往channel中发布消息

SUBSCRIBE channel [channel ...] 客户端订阅某个channgel,可以同时订阅多个channgel

UNSUBSCRIBE [channel [channel ...]] 客户端取消订阅channel

发布者伪代码:

string channel = "order_mq";
string msg = "user_1_1001";
redis.publish(channel,msg);


订阅者伪代码:

string channel = "order_mq";
redis.subscribe(channel);
while(true){
redis.onmessage(new MessageListsner(){
public void onMessage(string channel,string msg){
doProcessMsg(msg);
}
});
}


三、结束语

(1)、基于redis实现的消息系统,不同于activeMQ或者其他同类的消息中间件产品,没有提供持久化等一些特性,所以消息一旦丢失,就不能重现。所在监听者要启动早于生产者。

(2)、基于redis实现的消息系统,也没有提供配置多个任务去同时处理消息,在程序中可以采用线程池,多线程的方式去处理消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis pubsub 消息系统