您的位置:首页 > 数据库 > Redis

Java Jedis操作Redis示例(一)——pub/sub模式实现消息队列

2017-09-07 21:54 1236 查看
转载:http://blog.csdn.net/shaobingj126/article/details/50585035

转载:http://blog.csdn.net/abcd898989/article/details/51697596

一 消息队列

1. 定义

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

2. 消息队列的适用场景

消息队列的适用场景包括异步处理,应用解耦,流量削锋和消息通讯四个场景

1. 异步处理:异步处理中消息队列保存了当前处理操作,使得动作请求方可以在发出动作请求/写入消息队列后理解返回,异步获取结果,关注点在于请求的友好程度。



2. 应用解耦:应用解耦用于消除请求发起方和请求处理方的耦合,提升系统的健壮性。



3. 流量削锋:流量削峰一般指秒杀或抢购场景,消息队列用于控制活动人数,缓解高访问压力。



4. 日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。



3. 消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

P2P模式:



P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
接收者在成功接收消息之后需向队列应答成功

 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、

Pub/sub模式:



包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

4. 消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

(1)同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

(2)异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

二 Redis 发布-订阅模式(pub/sub)

Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:

订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;

发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。

1.       时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。

2.       空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。

3.       同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。

分类:

按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。

三 Redis pub/sub的实现(非持久)

Redis通过publishsubscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

1. 导入Redis依赖(以Maven工程为例子):

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>

2. 增加日志配置文件,这里使用系统输出代替日至

log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

3. 创建消息的发布者Publisher.java

package com.zenhobby.redis_pub_sub;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import redis.clients.jedis.Jedis;

public class Publisher {
private Jedis publisherJedis;
private String channel;

public Publisher(Jedis publishJedis,String channel){
this.publisherJedis=publishJedis;
this.channel=channel;
}
public void startPublish(){
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while(true){
System.out.println("请输入message:");
String line = reader.readLine();
if(!"quit".equals(line)){
publisherJedis.publish(channel, line);
}else{
break;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}


4. 实现消息的接收者Subscriber.java,实现JedisPubSub接口

package com.zenhobby.redis_pub_sub;

import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Channel:" + channel + ",Message:" + message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
}


JedisPubSub是Redis提供的抽象类,继承这个类就完成了对客户端对订阅的监听。

抽象类中存在六个方法。分别表示

监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )

5. 创建测试Main.java

package com.zenhobby.redis_pub_sub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class TestMain {
public static final String CHANNEL = "mychannel";
public static final String HOST = "127.0.0.1";
public static final int PORT = 6379;

private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);

public static void main(String[] args) {
final Jedis subscriberJedis = JEDIS_POOL.getResource();
final Jedis publisherJedis = JEDIS_POOL.getResource();
final Subscriber subscriber = new Subscriber();
new Thread(new Runnable() {
public void run() {
try {
System.out.println("Subscribing to mychannel,this thread will be block");
subscriberJedis.subscribe(subscriber, CHANNEL);
System.out.println("subscription ended");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Publisher(publisherJedis, CHANNEL).startPublish();
publisherJedis.close();

subscriber.unsubscribe();
subscriberJedis.close();
}
}


6.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下(直接把原博的图借过来啦):



注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。

四 Redis的pub/sub实现(持久)

Redis的pub/sub的持久主要通过,在非持久化的基础上需要作如下处理:

1. 重新实现Publisher

package com.zenhobby.redis.persistence;
import java.util.Set;
import redis.clients.jedis.Jedis;

public class PPubClient {
private Jedis jedis;
private String CONSTANT_CLIENTSET = "clientSet";
public PPubClient(String host,int port){
jedis = new Jedis(host,port);
}
private void put(String message){
Set<String> subClients = jedis.smembers(CONSTANT);
for(String clientKey:subClients){
jedis.rpush(clientKey, message);
}
}
public void pub(String channel,String message){
Long txid = jedis.incr("MAXID");
String content = txid+"/"+message;
this.put(content);
jedis.publish(channel, message);
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);
}
}

在新实现的Publisher中使用Jedis存储发布的消息。
2. 重新实现SubClient
package com.zenhobby.redis.persistence;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PPSubClient {
private Jedis jedis;
private JedisPubSub listener;
private String CONSTANT_CLIENTSET="clientSet";
public PPSubClient(String host,int port,String clientId){
jedis = new Jedis(host,port);
listener = new PPrintListener(clientId,new Jedis(host,port));
jedis.sadd(CONSTANT_CLIENTSET, clientId);
}
public void sub(String channel){
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
listener.unsubscribe(channel);
}
}

这个客户端并没有继承JedisPubSub类,转而在如下的输出类进行Listener的处理

3. Listener类用于处理消息

package com.zenhobby.persistence;

import java.util.Date;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PPrintListener extends JedisPubSub {

private String clientId;
private PSubHandler handler;
private String CONSTANT = "clientSet";
public PPrintListener(String clientId, Jedis jedis) {
this.clientId = clientId;
handler = new PSubHandler(jedis);
}

@Override
public void onMessage(String channel, String message) {
if (message.equalsIgnoreCase("quit")) {
this.unsubscribe(channel);
}
handler.handle(channel, message);
System.out.println("message receive:" + message + ",channel:" + channel);
}

private void message(String channel, String message) {
Date time = new Date();
System.out.println("message receive:" + message + ",channel:" + channel + time.toString());
}

@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("message receive:" + message + ",pattern channel:" + channel);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
handler.subscribe(channel);
System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
handler.unsubscribe(channel);
System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);
}

@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
for (String channel : channels) {
handler.unsubscribe(channel);
}
}

class PSubHandler {
private Jedis jedis;

PSubHandler(Jedis jedis) {
this.jedis = jedis;
}

public void handle(String channel, String message) {
int index = message.indexOf("/");
if (index < 0) {
return;
}
Long txid = Long.valueOf(message.substring(0, index));
String key = clientId + "/" + channel;
while (true) {
String lm = jedis.lindex(key, 0);
if (lm == null) {
break;
}
int li = lm.indexOf("/");
if(li<0){
String result = jedis.lpop(key);
if(result == null){
break;
}
message(channel, lm);
continue;
}
Long lxid = Long.valueOf(lm.substring(0, li));
if(txid>=lxid){
jedis.lpop(key);
message(channel,lm);
continue;
}else{
break;
}
}
}
public void subscribe(String channel){
String key = clientId+"/"+channel;
boolean exist = jedis.sismember(CONSTANT, key);
if(!exist){
jedis.sadd(CONSTANT, key);
}
}
public void unsubscribe(String channel){
String key = clientId+"/"+channel;
jedis.srem(CONSTANT, key);
jedis.del(key);
}
}
}


其中jedis.sismember(CONSTANT, Key)用于判断当前用户是否存在,如果不存在则添加(和Redis缓存的思路相同)。

4. 创建测试Main方法,具体内容如下:

package com.zenhobby.redis.persistence;

public class PPubSubTestMain {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 6379;
String clientId = "myclient";
PPubClient pubClient = new PPubClient(host, port);
final String channel = "mychannel";
final PPSubClient subClient = new PPSubClient(host, port, clientId);
Thread subThread = new Thread(new Runnable() {
public void run() {
System.out.println("------------sub----start------------");
subClient.sub(channel);
System.out.println("------------sub----end------------");
}
});
subThread.setDaemon(true);
subThread.start();
int i = 0;
while (i < 20) {
String message = "message--" + i;
pubClient.pub(channel, message);
i++;
Thread.sleep(100);
}
subClient.unsubscribe(channel);
}
}
5.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。这次我们是以循环调用作为输入内容作为消息发布的内容,各位看官观察控制台输出。示例如下:



然后,打开Redis客户端,观察当前Redis中保留的所有数据:



题外的话:

Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能

至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束

在此,对以下参考资料的作者表示感谢!:

参考资料:

redis官网:

http://redis.io/topics/pubsub

其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: