您的位置:首页 > 数据库 > Redis

基于Redis的消息订阅/发布

2016-09-08 12:16 585 查看
在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅。

这种设计模式在总线设计模式中得到体现。微软以前的WCF中实现了服务总线ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................

基于事件订阅的模式,比如EventBus类的组件产品。但是往往设计比较复杂。

如果依赖于Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。

Redis在2.0之后的版本中实现了事件推送的pub/sub命令

PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUBCHANNELSpattern列出当前的活跃频道
PUBSUBNUMSUBchannel-1channel-N返回给定频道的订阅者数量
PUBSUBNUMPAT返回订阅模式的数量
PUNSUBSCRIBE指示客户端退订所有给定模式
SUBSCRIBE订阅给定的一个或多个频道的信息
UNSUBSCRIBE指示客户端退订给定的频道

P开头的(pattern)支持通配符模式。

简单例子(来自:http://www.yiibai.com/redis/redis_pub_sub.html)

以下举例说明如何发布用户的概念工作。在下面的例子给出一个客户端订阅一个通道名为redisChat

redis127.0.0.1:6379>SUBSCRIBEredisChat

Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"redisChat"
3)(integer)1

现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。

redis127.0.0.1:6379>PUBLISHredisChat"Redisisagreatcachingtechnique"

(integer)1

redis127.0.0.1:6379>PUBLISHredisChat"Learnredisbytutorialspoint"

(integer)1

1)"message"
2)"redisChat"
3)"Redisisagreatcachingtechnique"
1)"message"
2)"redisChat"
3)"Learnredisbytutorialspoint"

上面的代码简单的演示了,订阅信道,向指定的信道发布消息。然后消息推送到订阅者。

在redis-cli客户端中推送消息的时候,返回成功发送到订阅者的数目。

如:(integer)1

原理:

RedisServer包含两个重要的结构:
1.channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2.patterns:存放模式+client地址的列表




流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。

订阅信道



消息推送



在C#中的实现

基于

ServiceStack.Redis

Task.Factory.StartNew(()=>
{
varclient=newRedisClient("192.168.1.100",6379,"你的密码");
try
{
varisOpen=client.Ping();
if(isOpen==false)
{
return;
}
varsub1=client.CreateSubscription();

//接受消息的委托
sub1.OnMessage=(chanel,message)=>
{
Console.WriteLine("chanelis:{0}",chanel);
Console.WriteLine("messageis:{0}",message);
};
sub1.SubscribeToChannels(newstring[]{"testchat"});//注意:订阅信道的时候会开启阻塞模式,所以,需要将监听放到单独的线程里
}
catch(Exception)
{

throw;
}

});


注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。

1使用idispose显示释放

2使用析构函数CLR回收的时候释放

sub1.Dispose();

例如:

publicvoidDispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protectedvirtualvoidDispose(booldisposing)
{
if(disposing)
{
if(redisClient!=null)
redisClient.Dispose();
if(subscription!=null)
subscription.Dispose();
if(log!=null)
log.Dispose();
}
}

~RedisEx()
{
Dispose(false);
}


官方推荐这种写法

varclientsManager=newPooledRedisClientManager(newstring[]{"密码@192.168.1.200:6379"});
varredisPubSub=newRedisPubSubServer(clientsManager,newstring[]{"testchat"})
{
OnMessage=(channel,msg)=>{
Console.WriteLine("方式2订阅演示.............");
Console.WriteLine("Received'{0}'from'{1}'",msg,channel);
}
}.Start();


Touse
RedisPubSubServer
,initializeitwiththechannelsyouwanttosubscribetoandassignhandlersforeachoftheeventsyouwanttohandle.

Ataminimumyou'llwanttohandle
OnMessage
:

Calling
Start()
afterit'sinitializedwillgetittostartlisteningandprocessinganymessagespublishedtothesubscribedchannels.

官方文档:https://github.com/ServiceStack/ServiceStack.Redis

附加文档

redisPUB/SUB(发布/订阅)

PSUBSCRIBE订阅一个或多个符合给定模式的频道

PUBLISH将信息message发送到指定的频道channel

PUBSUB是一个查看订阅与发布系统状态的内省命令

PUBSUBCHANNELSpattern列出当前的活跃频道

PUBSUBNUMSUBchannel-1channel-N返回给定频道的订阅者数量

PUBSUBNUMPAT返回订阅模式的数量

PUNSUBSCRIBE指示客户端退订所有给定模式

SUBSCRIBE订阅给定的一个或多个频道的信息

UNSUBSCRIBE指示客户端退订给定的频道

PSUBSCRIBE(订阅一个或多个符合给定模式的频道)

PSUBSCRIBEpattern[pattern…]
订阅一个或多个符合给定模式的频道。
每个模式以*作为匹配符,比如it*匹配所有以it开头的频道(it.news、it.blog、it.tweets等等),
news.*匹配所有以news.开头的频道(news.it、news.global.today等等),诸如此类。
可用版本:>=2.0.0
时间复杂度:O(N),N是订阅的模式的数量。
返回值:接收到的信息(请参见下面的代码说明)。

redis>psubscribenews.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"#返回值的类型:显示订阅成功
2)"news.*"#订阅的模式
3)(integer)1#目前已订阅的模式的数量

1)"pmessage"#返回值的类型:信息
2)"news.*"#信息匹配的模式
3)"news.it"#信息本身的目标频道
4)"GooglebuyMotorola"#信息的内容


PUBLISH(将信息message发送到指定的频道channel)

PUBLISHchannelmessage
将信息message发送到指定的频道channel。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是频道channel的订阅者数量,而M则是使用模式订阅(subscribed
patterns)的客户端的数量。
返回值:接收到信息message的订阅者数量。

#对没有订阅者的频道发送信息
redis>publishbad_channel"cananybodyhearme?"
(integer)0
#向有一个订阅者的频道发送信息
redis>publishmsg"goodmorning"
(integer)1
#向有多个订阅者的频道发送信息
redis>publishchat_room"hello~everyone"
(integer)3


PUBSUB(是一个查看订阅与发布系统状态的内省命令)

PUBSUB[argument[argument…]]
PUBSUB是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这
些子命令进行介绍。
可用版本:>=2.8.0

PUBSUBCHANNELS[pattern](列出当前的活跃频道)

列出当前的活跃频道。
活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。
pattern参数是可选的:
•如果不给出pattern参数,那么列出订阅与发布系统中的所有活跃频道。
•如果给出pattern参数,那么只列出和给定模式pattern相匹配的那些活跃频道。
复杂度:O(N),N为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常
数)。
返回值:一个由活跃频道组成的列表。

#client-1订阅news.it和news.sport两个频道
client-1>SUBSCRIBEnews.itnews.sport
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.sport"
3)(integer)2
#client-2订阅news.it和news.internet两个频道
client-2>SUBSCRIBEnews.itnews.internet
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.internet"
3)(integer)2
#首先,client-3打印所有活跃频道
#注意,即使一个频道有多个订阅者,它也只输出一次,比如news.it
client-3>PUBSUBCHANNELS
1)"news.sport"
2)"news.internet"
3)"news.it"
#接下来,client-3打印那些与模式news.i*相匹配的活跃频道
#因为news.sport不匹配news.i*,所以它没有被打印
redis>PUBSUBCHANNELSnews.i*
1)"news.internet"
2)"news.it"


PUBSUBNUMSUB[channel-1…channel-N](返回给定频道的订阅者数量)

返回给定频道的订阅者数量,订阅模式的客户端不计算在内。
复杂度:O(N),N为给定频道的数量。
返回值:一个多条批量回复(Multi-bulkreply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道channel-1,channel-1的订阅者数量,频道channel-2,channel-2的订阅者数量,诸如此类。
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。

#client-1订阅news.it和news.sport两个频道
client-1>SUBSCRIBEnews.itnews.sport
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.sport"
3)(integer)2
#client-2订阅news.it和news.internet两个频道
client-2>SUBSCRIBEnews.itnews.internet
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.internet"
3)(integer)2
#client-3打印各个频道的订阅者数量
client-3>PUBSUBNUMSUBnews.itnews.internetnews.sportnews.music
1)"news.it"#频道
2)"2"#订阅该频道的客户端数量
3)"news.internet"
4)"1"
5)"news.sport"
6)"1"
7)"news.music"#没有任何订阅者
8)"0"


PUBSUBNUMPAT(返回订阅模式的数量)

注意,这个命令返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。
复杂度:O(1)。
返回值:一个整数回复(Integerreply)。

#client-1订阅news.*和discount.*两个模式
client-1>PSUBSCRIBEnews.*discount.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"news.*"
3)(integer)1
1)"psubscribe"
2)"discount.*"
3)(integer)2
#client-2订阅tweet.*一个模式
client-2>PSUBSCRIBEtweet.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"tweet.*"
3)(integer)1
#client-3返回当前订阅模式的数量为3
client-3>PUBSUBNUMPAT
(integer)3
#注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在PUBSUBNUMPAT之内
#比如说,再新建一个客户端client-4,让它也订阅news.*频道
client-4>PSUBSCRIBEnews.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"news.*"
3)(integer)1
#这时再计算被订阅模式的数量,就会得到数量为4
client-3>PUBSUBNUMPAT
(integer)4


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

PUNSUBSCRIBE(指示客户端退订所有给定模式)

PUNSUBSCRIBE[pattern[pattern…]]
指示客户端退订所有给定模式。
如果没有模式被指定,也即是,一个无参数的PUNSUBSCRIBE调用被执行,那么客户端使用PSUBSCRIBE
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是客户端已订阅的模式的数量,M则是系统中所有客户端订阅的模式的数量。
返回值:这个命令在不同的客户端中有不同的表现。

SUBSCRIBE(订阅给定的一个或多个频道的信息)

订阅给定的一个或多个频道的信息。
可用版本:>=2.0.0
时间复杂度:O(N),其中N是订阅的频道的数量。
返回值:接收到的信息(请参见下面的代码说明)。

#订阅msg和chat_room两个频道
#1-6行是执行subscribe之后的反馈信息
#第7-9行才是接收到的第一条信息
#第10-12行是第二条
redis>subscribemsgchat_room
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"#返回值的类型:显示订阅成功
2)"msg"#订阅的频道名字
3)(integer)1#目前已订阅的频道数量
1)"subscribe"
2)"chat_room"
3)(integer)2
1)"message"#返回值的类型:信息
2)"msg"#来源(从那个频道发送过来)
3)"hellomoto"#信息内容
1)"message"
2)"chat_room"
3)"testing...haha"


UNSUBSCRIBE(指示客户端退订给定的频道)

UNSUBSCRIBE[channel[channel…]]
指示客户端退订给定的频道。
如果没有频道被指定,也即是,一个无参数的UNSUBSCRIBE调用被执行,那么客户端使用SUBSCRIBE命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
可用版本:>=2.0.0
时间复杂度:O(N),N是客户端已订阅的频道的数量。
返回值:这个命令在不同的客户端中有不同的表现。

参考文档:
http://blog.csdn.net/u011506468/article/details/47337839http://my.oschina.net/itblog/blog/601284http://www.oschina.net/code/snippet_584165_52231http://redis.io/topics/pubsub
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: