Java Jedis操作Redis示例(一)——pub/sub模式实现消息队列
2017-09-07 21:54
1236 查看
转载:http://blog.csdn.net/shaobingj126/article/details/50585035
转载:http://blog.csdn.net/abcd898989/article/details/51697596
1. 异步处理:异步处理中消息队列保存了当前处理操作,使得动作请求方可以在发出动作请求/写入消息队列后理解返回,异步获取结果,关注点在于请求的友好程度。
2. 应用解耦:应用解耦用于消除请求发起方和请求处理方的耦合,提升系统的健壮性。
3. 流量削锋:流量削峰一般指秒杀或抢购场景,消息队列用于控制活动人数,缓解高访问压力。
4. 日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
P2P模式:
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、
Pub/sub模式:
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
(1)同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;
发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
1. 时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。
2. 空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。
3. 同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。
分类:
按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。
1. 导入Redis依赖(以Maven工程为例子):
2. 增加日志配置文件,这里使用系统输出代替日至
3. 创建消息的发布者Publisher.java
4. 实现消息的接收者Subscriber.java,实现JedisPubSub接口
JedisPubSub是Redis提供的抽象类,继承这个类就完成了对客户端对订阅的监听。
抽象类中存在六个方法。分别表示
监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
5. 创建测试Main.java
6.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下(直接把原博的图借过来啦):
注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。
1. 重新实现Publisher
在新实现的Publisher中使用Jedis存储发布的消息。
2. 重新实现SubClient
package com.zenhobby.redis.persistence;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PPSubClient {
private Jedis jedis;
private JedisPubSub listener;
private String CONSTANT_CLIENTSET="clientSet";
public PPSubClient(String host,int port,String clientId){
jedis = new Jedis(host,port);
listener = new PPrintListener(clientId,new Jedis(host,port));
jedis.sadd(CONSTANT_CLIENTSET, clientId);
}
public void sub(String channel){
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
listener.unsubscribe(channel);
}
}
这个客户端并没有继承JedisPubSub类,转而在如下的输出类进行Listener的处理
3. Listener类用于处理消息
其中jedis.sismember(CONSTANT, Key)用于判断当前用户是否存在,如果不存在则添加(和Redis缓存的思路相同)。
4. 创建测试Main方法,具体内容如下:
然后,打开Redis客户端,观察当前Redis中保留的所有数据:
题外的话:
Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能。
至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束
在此,对以下参考资料的作者表示感谢!:
参考资料:
redis官网:
http://redis.io/topics/pubsub
其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html
转载:http://blog.csdn.net/abcd898989/article/details/51697596
一 消息队列
1. 定义
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。2. 消息队列的适用场景
消息队列的适用场景包括异步处理,应用解耦,流量削锋和消息通讯四个场景1. 异步处理:异步处理中消息队列保存了当前处理操作,使得动作请求方可以在发出动作请求/写入消息队列后理解返回,异步获取结果,关注点在于请求的友好程度。
2. 应用解耦:应用解耦用于消除请求发起方和请求处理方的耦合,提升系统的健壮性。
3. 流量削锋:流量削峰一般指秒杀或抢购场景,消息队列用于控制活动人数,缓解高访问压力。
4. 日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
3. 消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。P2P模式:
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、
Pub/sub模式:
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
4. 消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。(1)同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
二 Redis 发布-订阅模式(pub/sub)
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;
发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
1. 时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。
2. 空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。
3. 同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。
分类:
按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。
三 Redis pub/sub的实现(非持久)
Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。1. 导入Redis依赖(以Maven工程为例子):
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.0</version> </dependency>
2. 增加日志配置文件,这里使用系统输出代替日至
log4j.rootLogger=info,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
3. 创建消息的发布者Publisher.java
package com.zenhobby.redis_pub_sub; import java.io.BufferedReader; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; public class Publisher { private Jedis publisherJedis; private String channel; public Publisher(Jedis publishJedis,String channel){ this.publisherJedis=publishJedis; this.channel=channel; } public void startPublish(){ try{ BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("请输入message:"); String line = reader.readLine(); if(!"quit".equals(line)){ publisherJedis.publish(channel, line); }else{ break; } } }catch(Exception e){ e.printStackTrace(); } } }
4. 实现消息的接收者Subscriber.java,实现JedisPubSub接口
package com.zenhobby.redis_pub_sub; import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("Channel:" + channel + ",Message:" + message); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels); } }
JedisPubSub是Redis提供的抽象类,继承这个类就完成了对客户端对订阅的监听。
抽象类中存在六个方法。分别表示
监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
5. 创建测试Main.java
package com.zenhobby.redis_pub_sub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class TestMain { public static final String CHANNEL = "mychannel"; public static final String HOST = "127.0.0.1"; public static final int PORT = 6379; private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig(); private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0); public static void main(String[] args) { final Jedis subscriberJedis = JEDIS_POOL.getResource(); final Jedis publisherJedis = JEDIS_POOL.getResource(); final Subscriber subscriber = new Subscriber(); new Thread(new Runnable() { public void run() { try { System.out.println("Subscribing to mychannel,this thread will be block"); subscriberJedis.subscribe(subscriber, CHANNEL); System.out.println("subscription ended"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Publisher(publisherJedis, CHANNEL).startPublish(); publisherJedis.close(); subscriber.unsubscribe(); subscriberJedis.close(); } }
6.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下(直接把原博的图借过来啦):
注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。
四 Redis的pub/sub实现(持久)
Redis的pub/sub的持久主要通过,在非持久化的基础上需要作如下处理:1. 重新实现Publisher
package com.zenhobby.redis.persistence; import java.util.Set; import redis.clients.jedis.Jedis; public class PPubClient { private Jedis jedis; private String CONSTANT_CLIENTSET = "clientSet"; public PPubClient(String host,int port){ jedis = new Jedis(host,port); } private void put(String message){ Set<String> subClients = jedis.smembers(CONSTANT); for(String clientKey:subClients){ jedis.rpush(clientKey, message); } } public void pub(String channel,String message){ Long txid = jedis.incr("MAXID"); String content = txid+"/"+message; this.put(content); jedis.publish(channel, message); } public void close(String channel){ jedis.publish(channel, "quit"); jedis.del(channel); } }
在新实现的Publisher中使用Jedis存储发布的消息。
2. 重新实现SubClient
package com.zenhobby.redis.persistence;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PPSubClient {
private Jedis jedis;
private JedisPubSub listener;
private String CONSTANT_CLIENTSET="clientSet";
public PPSubClient(String host,int port,String clientId){
jedis = new Jedis(host,port);
listener = new PPrintListener(clientId,new Jedis(host,port));
jedis.sadd(CONSTANT_CLIENTSET, clientId);
}
public void sub(String channel){
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
listener.unsubscribe(channel);
}
}
这个客户端并没有继承JedisPubSub类,转而在如下的输出类进行Listener的处理
3. Listener类用于处理消息
package com.zenhobby.persistence; import java.util.Date; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class PPrintListener extends JedisPubSub { private String clientId; private PSubHandler handler; private String CONSTANT = "clientSet"; public PPrintListener(String clientId, Jedis jedis) { this.clientId = clientId; handler = new PSubHandler(jedis); } @Override public void onMessage(String channel, String message) { if (message.equalsIgnoreCase("quit")) { this.unsubscribe(channel); } handler.handle(channel, message); System.out.println("message receive:" + message + ",channel:" + channel); } private void message(String channel, String message) { Date time = new Date(); System.out.println("message receive:" + message + ",channel:" + channel + time.toString()); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("message receive:" + message + ",pattern channel:" + channel); } @Override public void onSubscribe(String channel, int subscribedChannels) { handler.subscribe(channel); System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { handler.unsubscribe(channel); System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); for (String channel : channels) { handler.unsubscribe(channel); } } class PSubHandler { private Jedis jedis; PSubHandler(Jedis jedis) { this.jedis = jedis; } public void handle(String channel, String message) { int index = message.indexOf("/"); if (index < 0) { return; } Long txid = Long.valueOf(message.substring(0, index)); String key = clientId + "/" + channel; while (true) { String lm = jedis.lindex(key, 0); if (lm == null) { break; } int li = lm.indexOf("/"); if(li<0){ String result = jedis.lpop(key); if(result == null){ break; } message(channel, lm); continue; } Long lxid = Long.valueOf(lm.substring(0, li)); if(txid>=lxid){ jedis.lpop(key); message(channel,lm); continue; }else{ break; } } } public void subscribe(String channel){ String key = clientId+"/"+channel; boolean exist = jedis.sismember(CONSTANT, key); if(!exist){ jedis.sadd(CONSTANT, key); } } public void unsubscribe(String channel){ String key = clientId+"/"+channel; jedis.srem(CONSTANT, key); jedis.del(key); } } }
其中jedis.sismember(CONSTANT, Key)用于判断当前用户是否存在,如果不存在则添加(和Redis缓存的思路相同)。
4. 创建测试Main方法,具体内容如下:
package com.zenhobby.redis.persistence; public class PPubSubTestMain { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 6379; String clientId = "myclient"; PPubClient pubClient = new PPubClient(host, port); final String channel = "mychannel"; final PPSubClient subClient = new PPSubClient(host, port, clientId); Thread subThread = new Thread(new Runnable() { public void run() { System.out.println("------------sub----start------------"); subClient.sub(channel); System.out.println("------------sub----end------------"); } }); subThread.setDaemon(true); subThread.start(); int i = 0; while (i < 20) { String message = "message--" + i; pubClient.pub(channel, message); i++; Thread.sleep(100); } subClient.unsubscribe(channel); } }5.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。这次我们是以循环调用作为输入内容作为消息发布的内容,各位看官观察控制台输出。示例如下:
然后,打开Redis客户端,观察当前Redis中保留的所有数据:
题外的话:
Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能。
至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束
在此,对以下参考资料的作者表示感谢!:
参考资料:
redis官网:
http://redis.io/topics/pubsub
其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html
相关文章推荐
- Java Jedis操作Redis示例(二)——list 生产者/消费者模式实现消息队列
- java redis使用之利用jedis实现redis消息队列
- 【Redis】Java实现redis消息订阅/发布(PubSub)
- Java利用Redis实现消息队列的示例代码
- java redis使用之利用jedis实现redis消息队列
- java redis使用之利用jedis实现redis消息队列
- Java Jedis操作Redis示例(三)——setnx/getset实现分布式锁
- 使用python的redis 实现消息的pub/sub功能
- jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换
- Java利用Redis实现消息队列
- Java利用Redis实现消息队列
- [示例] -- redis作为消息队列实现代码
- Redis缓存系统(一)Java-Jedis操作Redis,基本操作以及 实现对象保存
- Java操作redis实现增删查改功能的方法示例
- 如何使用Jedis操作Redis消息队列
- java线程实现简单 消息队列 1对1模式
- [置顶] Redis应用3-基于Redis消息队列实现的异步操作
- 使用Redis的Pub/Sub来实现类似于JMS的消息持久化
- Java利用Redis实现消息队列
- Redis缓存系统-Java-Jedis操作Redis,基本操作以及 实现对象保存