基于Redis的消息订阅/发布
2016-09-08 12:16
585 查看
在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅。
这种设计模式在总线设计模式中得到体现。微软以前的WCF中实现了服务总线ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................
基于事件订阅的模式,比如EventBus类的组件产品。但是往往设计比较复杂。
如果依赖于Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。
Redis在2.0之后的版本中实现了事件推送的pub/sub命令
PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUBCHANNELSpattern列出当前的活跃频道
PUBSUBNUMSUBchannel-1channel-N返回给定频道的订阅者数量
PUBSUBNUMPAT返回订阅模式的数量
PUNSUBSCRIBE指示客户端退订所有给定模式
SUBSCRIBE订阅给定的一个或多个频道的信息
UNSUBSCRIBE指示客户端退订给定的频道
P开头的(pattern)支持通配符模式。
现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。
RedisServer包含两个重要的结构:
1.channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2.patterns:存放模式+client地址的列表
流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。
订阅信道
消息推送
在C#中的实现
基于
注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。
1使用idispose显示释放
2使用析构函数CLR回收的时候释放
sub1.Dispose();
例如:
官方推荐这种写法
Touse
Ataminimumyou'llwanttohandle
Calling
官方文档:https://github.com/ServiceStack/ServiceStack.Redis
附加文档
PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUBCHANNELSpattern列出当前的活跃频道
PUBSUBNUMSUBchannel-1channel-N返回给定频道的订阅者数量
PUBSUBNUMPAT返回订阅模式的数量
PUNSUBSCRIBE指示客户端退订所有给定模式
SUBSCRIBE订阅给定的一个或多个频道的信息
UNSUBSCRIBE指示客户端退订给定的频道
订阅一个或多个符合给定模式的频道。
每个模式以*作为匹配符,比如it*匹配所有以it开头的频道(it.news、it.blog、it.tweets等等),
news.*匹配所有以news.开头的频道(news.it、news.global.today等等),诸如此类。
可用版本:>=2.0.0
时间复杂度:O(N),N是订阅的模式的数量。
返回值:接收到的信息(请参见下面的代码说明)。
将信息message发送到指定的频道channel。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是频道channel的订阅者数量,而M则是使用模式订阅(subscribed
patterns)的客户端的数量。
返回值:接收到信息message的订阅者数量。
PUBSUB是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这
些子命令进行介绍。
可用版本:>=2.8.0
活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。
pattern参数是可选的:
•如果不给出pattern参数,那么列出订阅与发布系统中的所有活跃频道。
•如果给出pattern参数,那么只列出和给定模式pattern相匹配的那些活跃频道。
复杂度:O(N),N为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常
数)。
返回值:一个由活跃频道组成的列表。
复杂度:O(N),N为给定频道的数量。
返回值:一个多条批量回复(Multi-bulkreply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道channel-1,channel-1的订阅者数量,频道channel-2,channel-2的订阅者数量,诸如此类。
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。
复杂度:O(1)。
返回值:一个整数回复(Integerreply)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
指示客户端退订所有给定模式。
如果没有模式被指定,也即是,一个无参数的PUNSUBSCRIBE调用被执行,那么客户端使用PSUBSCRIBE
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是客户端已订阅的模式的数量,M则是系统中所有客户端订阅的模式的数量。
返回值:这个命令在不同的客户端中有不同的表现。
可用版本:>=2.0.0
时间复杂度:O(N),其中N是订阅的频道的数量。
返回值:接收到的信息(请参见下面的代码说明)。
指示客户端退订给定的频道。
如果没有频道被指定,也即是,一个无参数的UNSUBSCRIBE调用被执行,那么客户端使用SUBSCRIBE命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
可用版本:>=2.0.0
时间复杂度:O(N),N是客户端已订阅的频道的数量。
返回值:这个命令在不同的客户端中有不同的表现。
参考文档:
http://blog.csdn.net/u011506468/article/details/47337839http://my.oschina.net/itblog/blog/601284http://www.oschina.net/code/snippet_584165_52231http://redis.io/topics/pubsub
这种设计模式在总线设计模式中得到体现。微软以前的WCF中实现了服务总线ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................
基于事件订阅的模式,比如EventBus类的组件产品。但是往往设计比较复杂。
如果依赖于Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。
Redis在2.0之后的版本中实现了事件推送的pub/sub命令
PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUBCHANNELSpattern列出当前的活跃频道
PUBSUBNUMSUBchannel-1channel-N返回给定频道的订阅者数量
PUBSUBNUMPAT返回订阅模式的数量
PUNSUBSCRIBE指示客户端退订所有给定模式
SUBSCRIBE订阅给定的一个或多个频道的信息
UNSUBSCRIBE指示客户端退订给定的频道
P开头的(pattern)支持通配符模式。
简单例子(来自:http://www.yiibai.com/redis/redis_pub_sub.html)
以下举例说明如何发布用户的概念工作。在下面的例子给出一个客户端订阅一个通道名为redisChatredis127.0.0.1:6379>SUBSCRIBEredisChat Readingmessages...(pressCtrl-Ctoquit) 1)"subscribe" 2)"redisChat" 3)(integer)1
现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。
redis127.0.0.1:6379>PUBLISHredisChat"Redisisagreatcachingtechnique" (integer)1 redis127.0.0.1:6379>PUBLISHredisChat"Learnredisbytutorialspoint" (integer)1 1)"message" 2)"redisChat" 3)"Redisisagreatcachingtechnique" 1)"message" 2)"redisChat" 3)"Learnredisbytutorialspoint"
上面的代码简单的演示了,订阅信道,向指定的信道发布消息。然后消息推送到订阅者。
在redis-cli客户端中推送消息的时候,返回成功发送到订阅者的数目。如:(integer)1 原理:
RedisServer包含两个重要的结构:
1.channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2.patterns:存放模式+client地址的列表
流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。
订阅信道
消息推送
在C#中的实现
基于
ServiceStack.Redis
Task.Factory.StartNew(()=>
{
varclient=newRedisClient("192.168.1.100",6379,"你的密码");
try
{
varisOpen=client.Ping();
if(isOpen==false)
{
return;
}
varsub1=client.CreateSubscription();
//接受消息的委托
sub1.OnMessage=(chanel,message)=>
{
Console.WriteLine("chanelis:{0}",chanel);
Console.WriteLine("messageis:{0}",message);
};
sub1.SubscribeToChannels(newstring[]{"testchat"});//注意:订阅信道的时候会开启阻塞模式,所以,需要将监听放到单独的线程里
}
catch(Exception)
{
throw;
}
});
注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。
1使用idispose显示释放
2使用析构函数CLR回收的时候释放
sub1.Dispose();
例如:
publicvoidDispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protectedvirtualvoidDispose(booldisposing)
{
if(disposing)
{
if(redisClient!=null)
redisClient.Dispose();
if(subscription!=null)
subscription.Dispose();
if(log!=null)
log.Dispose();
}
}
~RedisEx()
{
Dispose(false);
}
官方推荐这种写法
varclientsManager=newPooledRedisClientManager(newstring[]{"密码@192.168.1.200:6379"});
varredisPubSub=newRedisPubSubServer(clientsManager,newstring[]{"testchat"})
{
OnMessage=(channel,msg)=>{
Console.WriteLine("方式2订阅演示.............");
Console.WriteLine("Received'{0}'from'{1}'",msg,channel);
}
}.Start();
Touse
RedisPubSubServer,initializeitwiththechannelsyouwanttosubscribetoandassignhandlersforeachoftheeventsyouwanttohandle.
Ataminimumyou'llwanttohandle
OnMessage:
Calling
Start()afterit'sinitializedwillgetittostartlisteningandprocessinganymessagespublishedtothesubscribedchannels.
官方文档:https://github.com/ServiceStack/ServiceStack.Redis
附加文档
redisPUB/SUB(发布/订阅)
PSUBSCRIBE(订阅一个或多个符合给定模式的频道)
PSUBSCRIBEpattern[pattern…]订阅一个或多个符合给定模式的频道。
每个模式以*作为匹配符,比如it*匹配所有以it开头的频道(it.news、it.blog、it.tweets等等),
news.*匹配所有以news.开头的频道(news.it、news.global.today等等),诸如此类。
可用版本:>=2.0.0
时间复杂度:O(N),N是订阅的模式的数量。
返回值:接收到的信息(请参见下面的代码说明)。
redis>psubscribenews.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"#返回值的类型:显示订阅成功
2)"news.*"#订阅的模式
3)(integer)1#目前已订阅的模式的数量
1)"pmessage"#返回值的类型:信息
2)"news.*"#信息匹配的模式
3)"news.it"#信息本身的目标频道
4)"GooglebuyMotorola"#信息的内容
PUBLISH(将信息message发送到指定的频道channel)
PUBLISHchannelmessage将信息message发送到指定的频道channel。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是频道channel的订阅者数量,而M则是使用模式订阅(subscribed
patterns)的客户端的数量。
返回值:接收到信息message的订阅者数量。
#对没有订阅者的频道发送信息
redis>publishbad_channel"cananybodyhearme?"
(integer)0
#向有一个订阅者的频道发送信息
redis>publishmsg"goodmorning"
(integer)1
#向有多个订阅者的频道发送信息
redis>publishchat_room"hello~everyone"
(integer)3
PUBSUB(是一个查看订阅与发布系统状态的内省命令)
PUBSUB[argument[argument…]]PUBSUB是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这
些子命令进行介绍。
可用版本:>=2.8.0
PUBSUBCHANNELS[pattern](列出当前的活跃频道)
列出当前的活跃频道。活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。
pattern参数是可选的:
•如果不给出pattern参数,那么列出订阅与发布系统中的所有活跃频道。
•如果给出pattern参数,那么只列出和给定模式pattern相匹配的那些活跃频道。
复杂度:O(N),N为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常
数)。
返回值:一个由活跃频道组成的列表。
#client-1订阅news.it和news.sport两个频道
client-1>SUBSCRIBEnews.itnews.sport
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.sport"
3)(integer)2
#client-2订阅news.it和news.internet两个频道
client-2>SUBSCRIBEnews.itnews.internet
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.internet"
3)(integer)2
#首先,client-3打印所有活跃频道
#注意,即使一个频道有多个订阅者,它也只输出一次,比如news.it
client-3>PUBSUBCHANNELS
1)"news.sport"
2)"news.internet"
3)"news.it"
#接下来,client-3打印那些与模式news.i*相匹配的活跃频道
#因为news.sport不匹配news.i*,所以它没有被打印
redis>PUBSUBCHANNELSnews.i*
1)"news.internet"
2)"news.it"
PUBSUBNUMSUB[channel-1…channel-N](返回给定频道的订阅者数量)
返回给定频道的订阅者数量,订阅模式的客户端不计算在内。复杂度:O(N),N为给定频道的数量。
返回值:一个多条批量回复(Multi-bulkreply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道channel-1,channel-1的订阅者数量,频道channel-2,channel-2的订阅者数量,诸如此类。
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。
#client-1订阅news.it和news.sport两个频道
client-1>SUBSCRIBEnews.itnews.sport
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.sport"
3)(integer)2
#client-2订阅news.it和news.internet两个频道
client-2>SUBSCRIBEnews.itnews.internet
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"
2)"news.it"
3)(integer)1
1)"subscribe"
2)"news.internet"
3)(integer)2
#client-3打印各个频道的订阅者数量
client-3>PUBSUBNUMSUBnews.itnews.internetnews.sportnews.music
1)"news.it"#频道
2)"2"#订阅该频道的客户端数量
3)"news.internet"
4)"1"
5)"news.sport"
6)"1"
7)"news.music"#没有任何订阅者
8)"0"
PUBSUBNUMPAT(返回订阅模式的数量)
注意,这个命令返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。复杂度:O(1)。
返回值:一个整数回复(Integerreply)。
#client-1订阅news.*和discount.*两个模式
client-1>PSUBSCRIBEnews.*discount.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"news.*"
3)(integer)1
1)"psubscribe"
2)"discount.*"
3)(integer)2
#client-2订阅tweet.*一个模式
client-2>PSUBSCRIBEtweet.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"tweet.*"
3)(integer)1
#client-3返回当前订阅模式的数量为3
client-3>PUBSUBNUMPAT
(integer)3
#注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在PUBSUBNUMPAT之内
#比如说,再新建一个客户端client-4,让它也订阅news.*频道
client-4>PSUBSCRIBEnews.*
Readingmessages...(pressCtrl-Ctoquit)
1)"psubscribe"
2)"news.*"
3)(integer)1
#这时再计算被订阅模式的数量,就会得到数量为4
client-3>PUBSUBNUMPAT
(integer)4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
PUNSUBSCRIBE(指示客户端退订所有给定模式)
PUNSUBSCRIBE[pattern[pattern…]]指示客户端退订所有给定模式。
如果没有模式被指定,也即是,一个无参数的PUNSUBSCRIBE调用被执行,那么客户端使用PSUBSCRIBE
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
可用版本:>=2.0.0
时间复杂度:O(N+M),其中N是客户端已订阅的模式的数量,M则是系统中所有客户端订阅的模式的数量。
返回值:这个命令在不同的客户端中有不同的表现。
SUBSCRIBE(订阅给定的一个或多个频道的信息)
订阅给定的一个或多个频道的信息。可用版本:>=2.0.0
时间复杂度:O(N),其中N是订阅的频道的数量。
返回值:接收到的信息(请参见下面的代码说明)。
#订阅msg和chat_room两个频道
#1-6行是执行subscribe之后的反馈信息
#第7-9行才是接收到的第一条信息
#第10-12行是第二条
redis>subscribemsgchat_room
Readingmessages...(pressCtrl-Ctoquit)
1)"subscribe"#返回值的类型:显示订阅成功
2)"msg"#订阅的频道名字
3)(integer)1#目前已订阅的频道数量
1)"subscribe"
2)"chat_room"
3)(integer)2
1)"message"#返回值的类型:信息
2)"msg"#来源(从那个频道发送过来)
3)"hellomoto"#信息内容
1)"message"
2)"chat_room"
3)"testing...haha"
UNSUBSCRIBE(指示客户端退订给定的频道)
UNSUBSCRIBE[channel[channel…]]指示客户端退订给定的频道。
如果没有频道被指定,也即是,一个无参数的UNSUBSCRIBE调用被执行,那么客户端使用SUBSCRIBE命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
可用版本:>=2.0.0
时间复杂度:O(N),N是客户端已订阅的频道的数量。
返回值:这个命令在不同的客户端中有不同的表现。
参考文档:
相关文章推荐
- 初步试用Faye——基于发布和订阅模型的消息系统
- 分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载 推荐
- redis 消息订阅和发布
- 【Redis系列】Redis频道发布与消息订阅
- Spring Boot使用Redis进行消息的发布订阅
- NoSQL之Redis高级应用--发布及订阅消息
- Redis中的简单事物以及消息订阅发布
- Java基于IBMMQ消息的发布与订阅
- redis 消息队列(发布订阅)、持久化(RDB、AOF)、集群(cluster)
- Java实现Redis的消息订阅和发布
- Redis实战(12)订阅和发布消息
- 【Redis】jedis客户端实现redis消息的发布订阅(实时消息中间件)
- redis 高级应用之二(Redis的持久化 和 消息的[pub/sub]发布和订阅)
- 发布与订阅消息--Redis学习笔记八
- 基于WCF和MSMQ构建发布/订阅消息总线(Pub/Sub Message Bus)
- 分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载
- 使用Redis构建消息队列和发布订阅系统
- Redis发布及订阅消息(pub/sub)
- Spring Boot使用Redis进行消息的发布订阅
- spring---消息订阅发布之redis