您的位置:首页 > 数据库 > Redis

.Net Redis实现发布/订阅(RedisPubSubServer)

2016-07-13 11:43 776 查看
Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。本示例演示了怎么样通过Redis服务,来实现发布/订阅服务。借助于ServiceStack.Redis客户端,我们可以轻松来实现这些功能。在这里,依然采用控制台的方式,简单的为大家演示。如果想了解更多,大家可以访问:https://github.com/ServiceStack/ServiceStack.Text。


代码片段(3)[全屏查看所有代码]

1. [图片] 通过NuGet获取对ServiceStack.Redis的引用.png    




2. [代码]实现发布/订阅服务     跳至 [2] [全屏预览]

?
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208
using
ServiceStack.Messaging;

using
ServiceStack.Redis;

using
System;

using
System.Collections.Generic;

using
System.Linq;

using
System.Text;

using
System.Threading;

using
System.Threading.Tasks;

/******************************************************************************************************************

 
*

 
*

 
*
说 明: Redis发布订阅服务(版本:Version1.0.0)

 
*
作 者:李朝强

 
*
日 期:2015/05/19

 
*
修 改:

 
*
参 考:http://my.oschina.net/lichaoqiang/

 
*
备 注:暂无...

 
*

 
*

 
*
***************************************************************************************************************/

namespace
Lcq

{

    
class 
Program

    
{

        
///
<summary>

        
///

        
///
</summary>

        
///
<param name="args"></param>

        
static 
void 
Main(
string
[]
args)

        
{

            
Console.Title
= 
"Redis发布、订阅服务"
;

 
            
//发布服务

            
//Pub();

            
//Console.ReadLine();

 
 
            
//订阅

            
Subscription();

 
            
RedisClient
client = 
new
RedisClient(
"172.16.16.65"
,
6379);

            
while
(
true
)

            
{

                
string
input
= Console.ReadLine();

                
client.PublishMessage(
"channel-1"
,
input);

            
}

        
}

 
        
#region
发布/订阅·

        
///
发布

        
///
</summary>

        
public 
static 
void
Pub()

        
{

            
//PooledRedisClientManager

            
ServiceStack.Redis.IRedisClientsManager
RedisClientManager = 
new
ServiceStack.Redis.PooledRedisClientManager(
"172.16.16.65:6379"
);

 
            
//发布、订阅服务
IRedisPubSubServer

            
ServiceStack.Redis.RedisPubSubServer
pubSubServer = 
new
ServiceStack.Redis.RedisPubSubServer(RedisClientManager, 
"channel-1"
"channel-2"
);

 
            
//接收消息

            
pubSubServer.OnMessage
= (channel, msg) =>

            
{

                
Console.WriteLine(
string
.Format(
"服务端:频道{0}接收消息:{1}   
时间:{2}"
, channel, msg, DateTime.Now.ToString(
"yyyy/MM/dd
HH:mm:ss"
)));

                
Console.WriteLine(
"___________________________________________________________________"
);

            
};

            
pubSubServer.OnStart
= () =>

            
{

                
Console.WriteLine(
"发布服务已启动"
);

                
Console.WriteLine(
"___________________________________________________________________"
);

            
};

            
pubSubServer.OnStop
= () => { Console.WriteLine(
"服务停止"
);
};

            
pubSubServer.OnUnSubscribe
= (channel) => { Console.WriteLine(channel); };

            
pubSubServer.OnError
= (e) => { Console.WriteLine(e.Message); };

            
pubSubServer.OnFailover
= (s) => { Console.WriteLine(s); };

 
            
//pubSubServer.OnHeartbeatReceived
= () => { Console.WriteLine("OnHeartbeatReceived"); };

            
//pubSubServer.OnHeartbeatSent
= () => { Console.WriteLine("OnHeartbeatSent"); };

            
//pubSubServer.IsSentinelSubscription
= true;

            
pubSubServer.Start();

 
 
        
}

 
 
        
///
<summary>

        
///
发送消息

        
///
</summary>

        
public 
static 
Task
Send()

        
{

            
return
Task.Run(()
=>

            
{

                
RedisClient
client = 
new
RedisClient(
"172.16.16.65"
,
6379);

                
client.PublishMessage(
"channel-1"
"这是我发送的消息!"
);

            
});

        
}

 
        
///
<summary>

        
///
订阅

        
///
</summary>

        
public 
static 
void 
Subscription()

        
{

            
using
(ServiceStack.Redis.RedisClient
consumer = 
new
RedisClient(
"172.16.16.65"
,
6379))

            
{

                
//创建订阅

                
ServiceStack.Redis.IRedisSubscription
subscription = consumer.CreateSubscription();

 
                
//接收消息处理Action

                
subscription.OnMessage
= (channel, msg) =>

                
{

                    
Console.WriteLine(
"频道【"
+
channel + 
"】订阅客户端接收消息:"
":"
+
msg + 
"         ["
+
DateTime.Now.ToString(
"yyyy/MM/dd HH:mm:ss"
)
+ 
"]"
);

                    
Console.WriteLine(
"订阅数:"
+
subscription.SubscriptionCount);

                    
Console.WriteLine(
"___________________________________________________________________"
);

 
                
};

 
                
//订阅事件处理

                
subscription.OnSubscribe
= (channel) =>

                
{

                    
Console.WriteLine(
"订阅客户端:开始订阅"
+
channel);

                
};

 
                
//取消订阅事件处理

                
subscription.OnUnSubscribe
= (a) => { Console.WriteLine(
"订阅客户端:取消订阅"
);
};

 
                
//模拟:发送100条消息给服务

                
Task.Run(()
=>

                
{

                    
using
(ServiceStack.Redis.IRedisClient
publisher = 
new
ServiceStack.Redis.RedisClient(
"172.16.16.65"
,
6379))

                    
{

                        
for
(
int
i
= 1; i <= 100; i++)

                        
{

                            
publisher.PublishMessage(
"channel-1"
string
.Format(
"这是我发送的第{0}消息!"
,
i));

                            
System.Threading.Thread.Sleep(200);

                        
}

                    
}

                    
subscription.UnSubscribeFromAllChannels();

                
});

 
                
//订阅频道

                
subscription.SubscribeToChannels(
"channel-1"
);

            
}

 
        
}

 
 
        
#endregion

 
        
///
<summary>

        
///
Example

        
///
</summary>

        
public 
static 
void 
Example()

        
{

            
var
messagesReceived
= 0;

            
var
PublishMessageCount
= 10;

            
string
MessagePrefix
= 
"LCQ:"
,
ChannelName = 
"channel-4"
;

            
using
(
var
redisConsumer
= 
new
RedisClient(
"172.16.16.65"
,
6379))

            
using
(
var
subscription
= redisConsumer.CreateSubscription())

            
{

                
//订阅

                
subscription.OnSubscribe
= channel =>

                
{

                    
Console.WriteLine(
"订阅频道'{0}'"
,
channel);

                    
Console.WriteLine();

                
};

                
//取消订阅

                
subscription.OnUnSubscribe
= channel =>

                
{

                    
Console.WriteLine(
"取消订阅
'{0}'"
, channel);

                    
Console.WriteLine();

                
};

 
                
//接收消息

                
subscription.OnMessage
= (channel, msg) =>

                
{

                    
Console.WriteLine(
"接收消息
'{0}' from channel '{1}'"
, msg, channel);

                    
Console.WriteLine();

                    
//As
soon as we've received all 5 messages, disconnect by unsubscribing to all channels

                    
if
(++messagesReceived
== PublishMessageCount)

                    
{

                        
subscription.UnSubscribeFromAllChannels();

                    
}

                
};

 
                
ThreadPool.QueueUserWorkItem(x
=>

                
{

                    
Thread.Sleep(200);

                    
Console.WriteLine(
"开始发送消息..."
);

 
                    
using
(
var
redisPublisher
= 
new
RedisClient(
"172.16.16.65"
,
6379))

                    
{

                        
for
(
var
i
= 1; i <= PublishMessageCount; i++)

                        
{

                            
var
message
= MessagePrefix + i;

                            
Console.WriteLine(
"正在发布消息
'{0}' 到频道 '{1}'"
, message, ChannelName);

                            
Console.WriteLine();

                            
redisPublisher.PublishMessage(ChannelName,
message);

                        
}

                    
}

                
});

 
                
Console.WriteLine(
"开始启动监听
'{0}'"
, ChannelName);

                
subscription.SubscribeToChannels(ChannelName); 
//blocking

            
}

 
            
Console.WriteLine(
"EOF"
);

        
}

    
}

}


?




3. [图片] 示例控制台效果.png 
  


http://www.oschina.net/code/snippet_584165_52231
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: