Redis 学习笔记十 发布者订阅者模式与生产者消费者模式
2017-01-09 14:35
405 查看
消息队列有两种场景
生产者消费者:一个消息只能有一个消费者发布者订阅者:一个消息可以被多个消费者收到
redis从2.0版本开始支持pub/sub。
而Producer/Consumer是借助于redis的list结构来实现的。
生产者/消费者
Producer调用redis的lpush往特定key里放消息,Consumer调用brpop去不断监听key。代码(未实际测试)
Producer
String key="demo:mq:test"; String msg="hello world"; redisDao.lpush(key,msg);
consumer
String key="demo:mq:test"; while(true){ List<String> msgs=redisDao.brpop(BLOCK_TIMEOUT,listKey); if(msgs==null)continue; String jobMsg=msgs.get(1); processMsg(jobMsg); }
实际使用中,redisDao.bropop建议不要设置为BLOCK_TIMEOUT,可以设置为30(单位秒)
发布/订阅
类似设计模式中的观察者模式。redis可以作为pub/sub的服务端。订阅得通过subscribe和psubscribe命令向redis server订阅消息类型,redis将消息类型称为channel。
当发布者通过publish命令向redis server发送特定消息时,访问该消息类型全部client都会收到此消息。
一个client可以订阅多个channel,也可以向多个channel发送消息。
测试
client0 运行:
redis-cli -h localhost -a 密码 publish tv1 program1 publish tv2 programe2
client1
subscribe tv1
client2
subscribe tv2
client3
subscribe tv1 tv2
psubscribe tv*
批量订阅以tv开头的channel内容。取消订阅 unsubscribe tv1
批量取消订阅 punsubscribe tv*
使用java
直接使用Tcp socket订阅
import java.net.*; import java.io.*; public class PubSubTest { public static void main(String[] args) { String cmd = "subscribe goods.add_msg\r\n"; try { Socket socket = new Socket("127.0.0.1",6379); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); out.write(cmd.getBytes()); //发送订阅命令 byte[] buffer = new byte[1024]; while (true) { int readCount = in.read(buffer); System.out.write(buffer, 0, readCount); } } catch (Exception e) { } } }
jedis发布
/** * 发布一个消息 * @param channel * @param message */ public static void publishMsg(String channel,String message){ Jedis jedis = null; try { jedis = getResource(); jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } } /** * 发布一个消息 * @param channel * @param message */ public static void publishMsg(byte[] channel,byte[] message){ Jedis jedis = null; try { jedis = getResource(); jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } }
jedis订阅
需要继承JedisPubSub,并实现其中的onMessage、onSubscribe等接口。jedis.subscribe(listener, "channel1");
JedisPubSub函数功能:
package demo; import redis.clients.jedis.JedisPubSub; public class MyListener extends JedisPubSub { // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { System.out.println(channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { // System.out.println(channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { // System.out.println(channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "=" + channel + "=" + message); } }
订阅示例:
final Jedis jedis = ru.getConnection(); final MyListener listener = new MyListener(); //可以订阅多个频道 //订阅得到信息在lister的onMessage(...)方法中进行处理 //jedis.subscribe(listener, "channel1", "channel2"); //也用数组的方式设置多个频道 //jedis.subscribe(listener, new String[]{"channel3","channel4"}); //这里启动了订阅监听,线程将在这里被阻塞 //订阅得到信息在lister的onPMessage(...)方法中进行处理 jedis.psubscribe(listener, new String[]{"channel*"});//使用模式匹配的方式设置频道
本文参考:
http://www.360doc.com/content/15/0512/15/1073512_469935043.shtml
相关文章推荐
- redis 学习笔记(3)-master/slave(主/从模式)
- redis 学习笔记(3)-master/slave(主/从模式)
- redis 学习笔记(3)-master/slave(主/从模式)
- redis 学习笔记(3)-master/slave(主/从模式)
- Redis的主从模式-Redis学习笔记五
- C#面向对象设计模式纵横谈学习笔记(1)
- 设计模式学习笔记(1)
- 设计模式学习笔记(5)
- 设计模式学习笔记(三)
- IOC模式的学习笔记
- 设计模式的学习笔记!(一)
- 学习笔记之ORM设计中用到的模式
- 设计模式学习笔记(一)
- [设计模式学习笔记之一]面向对象是什么?
- [设计模式学习笔记之一]面向对象是什么?
- 设计模式学习笔记2:说说Mediator模式
- C#设计模式学习笔记---简单工厂模式
- 设计模式学习笔记1:对设计模式学习的一点想法兼谈Facade模式
- JAVA与模式 学习笔记(一) 统一的建模语言UML介绍(2)
- 设计模式学习笔记(2)