您的位置:首页 > 数据库 > Redis

redis实现消息队列(发布/订阅模式)

weixin_43113679 2019-05-24 23:38 429 查看
版权声明:如需转载,请写明出处 https://blog.csdn.net/weixin_43113679/article/details/90524617

redis的列表类型天生支持用作消息队列(类似于MQ的队列模型–任何时候都可以消费,一条消息只能消费一次),学习过程借鉴https://www.geek-share.com/detail/2750097560.html

关于redis的list操作https://blog.csdn.net/weixin_43113679/article/details/90080933

java程序实现消息队列

先生产者
本人下面的目录结构参考https://blog.csdn.net/weixin_43113679/article/details/90413124
测试类

@Test
public void putMessage() throws Exception {
ctx = new ClassPathXmlApplicationContext("spring-service.xml");
userService = ctx.getBean(UserService.class);
for(int i=0;i<20;i++) {
userService.putMessage(i);
}

}
@Override
public void putMessage(int message) throws Exception {
//把消息发布
String messageKey = "message:queue";
redisDao.lpush(messageKey, String.valueOf(message));

}
@Override
public void lpush(String messageKey, String message) {
jedisPool.getResource().lpush(messageKey, message);

}

结果

这样信息就发布成功了,这说一下,redis是单线程,如果上面用多个线程实现在缓存中的排列方式还是这样
准备消费者
这我用while循环来看点有意思的结果
先说一句:按以前redis在cmd操作的方式,当value值没有了,那key就会销毁不存在
测试类

@Test
public void getMessage() throws Exception {
ctx = new ClassPathXmlApplicationContext("spring-service.xml");
userService = ctx.getBean(UserService.class);
int i =1;
while(true) {
String message = userService.getMessage();
System.out.println("第"+(i++)+"消息,"+"value="+message);
}

}
@Override
public String getMessage() throws Exception {
//把消息取出来
String messageKey = "message:queue";
return redisDao.rpop(messageKey);
}
@Override
public String rpop(String messageKey) throws Exception {

return jedisPool.getResource().rpop(messageKey);
}

结果太多瞬间过去了(这提醒一下,把最大连接写大点,要不会报错,水池耗尽Pool exhausted)
给大家看一个靠后的

竟然有value是null的情况可能是我没加判定的原因,现在我加上

@Test
public void getMessage() throws Exception {
ctx = new ClassPathXmlApplicationContext("spring-service.xml");
userService = ctx.getBean(UserService.class);
int i =1;
while(true) {
String message = userService.getMessage();
if(message == null){
break;
}
System.out.println("第"+(i++)+"消息,"+"value="+message);

}

}

测试是没问题了,但是不能所有的都这样啊,长久肯定有问题啊,当生产者和消费者一直插入和读取呢,如果读取到还好,读取不到不就造成多余连接,浪费资源啊,除非你Thread.sleep()来让消费者休息一下,这样是不会造成不必要的浪费但是有两个问题

  1. 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间
  2. 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销

下面介绍brpop和blpop实现阻塞读取(非常重要)

命令参数

brpop key [key …] timeout    //右端弹出
blpop key [key …] timeout    //左端弹出

区别就是第一个是从右端弹出一个元素,第二个是从左端弹出一个元素
r和l就应该知道,这不说头部(R)和尾部(L),这样好理解
实例:

127.0.0.1:6379> lpush queue2 3 4
(integer) 2
127.0.0.1:6379> brpop queue2 5
1) "queue2"                     //返回值第一个是要返回的列表的key
2) "3"							//因为是brpop,所以返回的value是 3,这是一个一个返回的
127.0.0.1:6379> brpop queue2 5
1) "queue2"
2) "4"
127.0.0.1:6379> brpop queue2 5
(nil)
(5.04s)

弹出了两次后queue2列表里就没有value了,那就会消失,
  timeout也就是上面命令行的5,代表是当列表里没有此key的时候(当value没有了,可以也就销毁了),会等待5S,这5s里是阻塞的,
  当时间到了,而还没出现此key就会返回nil,相反,如果在等待时间里此key出现了,那么就会弹出指定的key,和value
实例

我稍微定的时间长一些,两个窗口,在timeout内添加上queue2的列表就没问题了
对于命令行 key [key …] 说明可以多个key
当key都存在

127.0.0.1:6379> lpush queue1 1  //添加测试用例
(integer) 1
127.0.0.1:6379> lpush queue2 2
(integer) 1
127.0.0.1:6379> brpop qeueu1 queue2 5  //当brpop,当多个key时
1) "queue2"   //右边的列表queue2弹出
2) "2"
127.0.0.1:6379> lpush queue2 2
(integer) 1
127.0.0.1:6379> blpop queue1 queue2 5  //改成blpop,当多个key时
1) "queue1"     //左边的列表queue1弹出
2) "1"

从上面的例子得出当有多个key(都存在),根据 brpop和blpop来决定哪个列表,再在列表里根据brpop和blpop来决定弹出哪边
当key有不存在的

127.0.0.1:6379> del queue2  //删除列表 queue2
(integer) 1
127.0.0.1:6379> brpop queue2 queue1 5  //第一个可以是不存在的,第二个是存在的
1) "queue1"   			//返回的是第二个的key
2) "4"
127.0.0.1:6379> brpop queue1 queue2 5   //调换位置
1) "queue1"  					//还是列表queue1的key
2) "5"
127.0.0.1:6379> del queue1    //删除列表queue1
(integer) 1
127.0.0.1:6379> brpop queue1 queue2 5  //等待5s,失败
(nil)
(5.04s)

从上面的例子能看出当多个key时,当有不存在时,会把不存在的剔除(或者忽略),之后再存在的列表里决定弹出哪一边,当都不存在时就等待timeout了
借此特性可以区分优先级的任务队列,根据key的顺序读取每一个数据
程序我就不写了,跟据上面的改就行,
在java程序里,timeout写在前面,key写在后面,返回的是一个List集合

发布/订阅模式(一个消息可以被多个订阅者消费)

1 客户端发布/订阅

1.1普通的发布/订阅

除了实现任务队列外,redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式。"发布/订阅"模式同样可以实现进程间的消息传递,其原理如下:
  "发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。
(1)发布消息1

127.0.0.1:6379> PUBLISH channel:1 hi   //如向 channel1:1说一声hi
(in
20000
teger) 0

一开始没有订阅者返回的是0,发出去的消息不会被持久化,也就是有客户端订阅channel:1后只能接收到后续发布到该频道的消息,之前的就接收不到了。
(2)订阅频道
  订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是 subscribe channel1 [channel2 …],例如新开一个客户端订阅上面频道:(不会收到消息,因为不会收到订阅之前就发布到该频道的消息)

127.0.0.1:6379> SUBSCRIBE channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"    //表示订阅成功的反馈信息
2) "channel:1"   // 订阅成功的频道名称
3) (integer) 1    //当前客户端订阅的频道数量

订阅后就不能进行新的操作,所以建议开两个cmd窗口,只能接收发布者的消息
(3)发布者(第一个客户端重新往channel:1发送一条消息)

127.0.0.1:6379> PUBLISH channel:1 hi
(integer) 1   //返回的是订阅者的数量

订阅的客户端显示的内容

127.0.0.1:6379> SUBSCRIBE channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1   //上面的是重复的
1) "message"   //表示接收到的消息
2) "channel:1" //示产生消息的频道名称,因为可以接受多个频道
3) "hi"  //消息的内容

取消订阅

redis 127.0.0.1:6379> unsubscribe channel:1
1) "unsubscribe"  //成功取消订阅某个频道
2) "channel:1"  //频道名称
3) (integer) 0  //当前客户端订阅的频道数量

在此过程中,发布消息的客户端可以随意操作,但是订阅消息的客户端只能接收订阅的消息,不能进行其他的操作,再开一个新客户端,端口号一样,那新客户端可以进行任意操作

1.2按照规则发布/订阅

除了可以使用subscribe命令来订阅指定的频道外,还可以使用psubscribe命令来订阅频道
它们两个的区别命令是在开头加了一个p另一个区别就是前面的是制定频道订阅,后面的是模糊频道订阅
如果SQL语句的模糊查询就更好理解了
命令

psubscribe pattern [pattern ...]  //订阅多个模式的频道

通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符
下面来看一下这三个通配符的实例吧

127.0.0.1:6379> psubscribe c? b* d?*  //订阅多个频道,并且都有通配符
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"			//下面这些就不说了,和subscribe订阅一样的
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"

新开个客户端发送频道信息到指定频道

127.0.0.1:6379> publish c m1  //频道是c,但是没有订阅者,上面的c后面跟的?
(integer) 0
127.0.0.1:6379> publish c1 m1 //有订阅者
(integer) 1
127.0.0.1:6379> publish c11 m1 //没有订阅者,因为?代表的是一个占位符,这离的c后面有两个
(integer) 0
127.0.0.1:6379> publish b m1 //有,*代表0到无限的占位符,下同
(integer) 1
127.0.0.1:6379> publish b1 m1
(integer) 1
127.0.0.1:6379> publish b11 m1
(integer) 1
127.0.0.1:6379> publish d m1 //没有订阅者,?*代表至少1和占位符,下面就有了
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1

都是发送的m1的消息
上面返回值为1表示被订阅者所接受,可以匹配上面的通配符。
现在看看订阅者的客户端的信息吧


注意:
  (1)使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息,而同时publish命令的返回值是2而不是1。.同样的,如果有另一个客户端执行了subscribe c1 和psubscribe c?*的话,向c1发送一条消息该客户顿也会受到两条消息(但是是两种类型:message和pmessage),同时publish命令也返回2.

(2)punsubscribe命令可以退订指定的规则,用法是: punsubscribe [pattern [pattern …]],如果没有参数则会退订所有规则

(3)使用punsubscribe只能退订通过psubscribe命令订阅的规则不会影响直接通过subscribe命令订阅的频道,同样unsubscribe命令不会影响通过psubscribe命令订阅的规则。
另外需要注意punsubscribe命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe * 无法退订c规则,而是必须使用punsubscribe c*才可以退订。

2 java程序实现发布者订阅者模式

首先强调一点,订阅者在订阅模式下要一直开着,或者阻塞,因为它的特性决定当你退出后发布者发布什么就和你无关了,要想接收就需要一直开着,或者一直阻塞,在java程序时,消费者需要一直开着等待生产者发送消息,消费者才能接收到
因为Eclipse的Junit只能一个测试,我就分开了,测试用Eclipse,另一个用cmd
目录结构参考:https://blog.csdn.net/weixin_43113679/article/details/90413124
生产者

/**
* 生产者
* */
@Test
public void MessageProducer()throws Exception{
ctx = new ClassPathXmlApplicationContext("spring-service.xml");
userService = ctx.getBean(UserService.class);
for(int i = 0; i < 10; i++) {
userService.publish(i);

}
};

service

@Override
public void publish(int i) throws Exception {
//频道号
String channel = "channel:1";
redisDao.publish(channel, String.valueOf(i));

}

redisDao

@Override
public void publish(String channel, String message) throws Exception {
jedisPool.getResource().publish(channel, message);

}

结果

(1)subscribe实现订阅者

/**
*  用subscribe订阅的订阅者
* */
@Test
public void MessageConsumerSubscribe()throws Exception{
ctx = new ClassPathXmlApplicationContext("spring-service.xml");
userService = ctx.getBean(UserService.class);
userService.subscribe();
}

上面的userService.subscribe(); 不用while循环,你只要订阅了,除非退出,要不会一直阻塞在那等待发布者发布指定频道的消息,有消息就会处理,再等待,所以不需要自己添加while循环
service

@Override
public void subscribe() throws Exception {
String channel = "channel:1";
redisDao.subscribe(channel);
}

redisDao

/**
* 处理订阅/发布的订阅者根据频道接收的信息
* */
private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();

@Override
public void subscribe(String channel) throws Exception {
jedisPool.getResource().subscribe(myJedisPubSub, channel);
//下面就直接交给myJedisPubSub来处理返回的信息了
}

}
/**
* 继承JedisPubSub,重写接收消息的方法
*/
class MyJedisPubSub extends JedisPubSub {
/**
* 结束程序的消息
* */
private static final String EXIT_COMMAND = "exit";
/** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
**/
@Override
public void onMessage(String channel, String message) {
//接收到exit消息后退出
if (EXIT_COMMAND.equals(message)) {
System.exit(0);
}
System.out.println("-接收到消息:channel=" + channel + ",message=" + message);

}
}


结果

(2)psubscribe实现订阅者
这个就不写了,和上面一样,把channel 改成模糊频道,再把所有的subscribe改成psubscribe,还有重写JedisPubSub(这个和上边的有点不一样),

/**
1. 继承JedisPubSub,重写接收消息的方法
*/
class MyJedisPubSub extends JedisPubSub {
/**
* 结束程序的消息
* */
private static final String EXIT_COMMAND = "exit";

@Override
public void onPMessage(String pattern, String channel, String message) {
//接收到exit消息后退出,程序停住
if (EXIT_COMMAND.equals(message)) {
System.exit(0);
}
System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);

}
}

到此我们实现了两种消息队列,

  1. redis自带的list类型(lpushrpop或者brpoprpushlpop或者blpop),带b的是阻塞读取
  2. 发布/订阅模式发布者(publish channel message),订阅者(subscribe channel [channel …]或者psubscribe pattern [pattern …])前者指定频道后者模糊频道

补充

(1)取消订阅

就像上面一样,订阅了此线程就会在这继续等待,不会停止也不会返回主线程,就在这等待发布者发布消息,接收,除非像上面一样 System.exit(0)结束程序 ,在测试时没问题,但是在做项目是就不行了,要不一直在这等待,怎么返回结果呢,所以就需要取消订阅,调用JedisPubSub的 unsubscribe() 方法
程序和上面的基本没变

@Override
public void subscribe(String channel) throws Exception {
jedisPool.getResource().subscribe(myJedisPubSub, channel);

System.out.println("取消订阅后返回主线程");
}

}
/**
* 继承JedisPubSub,重写接收消息的方法
*/
class MyJedisPubSub extends JedisPubSub {
/**
* 结束程序的消息
* */
private static final String EXIT_COMMAND = "exit";
/** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
**/
@Override
public void onMessage(String channel, String message) {
System.out.println("-接收到消息:channel=" + channel + ",message=" + message);
//接收到exit消息后退出返回主线程
if (EXIT_COMMAND.equals(message)) {
unsubscribe(channel);
}
}
@Override
public void unsubscribe(String... channels) {

super.unsubscribe(channels);
}
}
127.0.0.1:6379> PUBLISH channel:1 5
(integer) 1
127.0.0.1:6379> PUBLISH channel:1 6
(integer) 1
127.0.0.1:6379> PUBLISH channel:1 exit
(integer) 1

结果

 返回redisDao里了,这样就成功了,但是一想这还是发布方决定的啊,
 一想邮件什么的不是有取消订阅吗?
 这个就会有概念混淆的问题,邮件里的订阅和取消订阅和这个的订阅还是有区别的,这里的订阅更像是实时动态群发,群接受
看源码

  当客户端为null时你可以这么理解,就是你自己退出时,订阅就结束了,当你登陆后开始新的订阅
  这时候你会想根据redis的订阅的特性,当你不在的时候发布者的发布消息你都接收不到了,那我再登录时这些我还有吗?答案是肯定有的
  现在有可能根据测试例子你都忘了redis到底是干什么的,它是缓存啊,缓存里的数据肯定要在数据库里备份啊,就算你不在,你登陆后完全可以去数据库中查找啊
  psubscribe也有punsubscribe,和上面差不多

(2)BRPOP:当给定列表内没有任何元素可供弹出的时候,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。(每次只弹出一个元素,当没有元素的时候处于阻塞,当弹出一个元素之后就会解除阻塞)

@Override
public List<String> brpop(String messageKey) throws Exception {
List<String> list = new ArrayList<String>();
System.out.println("准备接受列表的全部数据");
list = jedisPool.getResource().brpop(5, messageKey);
System.out.println("接收完成,返回");
return list;
}

上面的就是brpop的过程,brpop返回的是一个List<String>集合,
第一个返回的是列表的名称,也可以说是列表的key
第二个就是弹出的信息
这个也是弹出一个就结束返回主线程

当redis中没有此列表时,timeout就要发挥作用了

brpop阻塞也是有时间限制的,不像发布/订阅,只要超过timeout还没有消息就会结束(在此期间还是阻塞等待的过程),返回主线程

标签: