您的位置:首页 > 数据库 > Redis

NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现

2016-06-17 19:24 1191 查看
[不忘初心]

前文我们介绍了使用Redis基本命令的方式来实现发布与订阅的功能,但是在实际开发中,我们更多会使用高级语言使用Redis。本文我们就来演示如何使用java语言来实现订阅与发布功能。先特别声明:本文所示代码参考自其他博文,详情见篇尾。

准备工作:

a.操作系统:windows 7

b.其他软件:redis 3.2 , jdk 1.7,eclipse mars

--------------------------------------------------------------------------------------------------------------------------------------------------------

一。非持久化的订阅与发布

1.新建Maven工程,工程类型为quickstart。并修改buildpath中的jre版本为1.7.

2.修改pom文件内容,具体如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.csdn.ingo</groupId>
<artifactId>redis-pub-sub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>redis-pub-sub</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
3.读者按需增加日志配置文件,下文我们使用系统输出代替日志。

log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
4.创建Publisher.java文件,作为消息的发布者,具体内容如下:

package com.csdn.ingo.redis_pub_sub;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;

/**
*@author 作者 E-mail:ingo
*@version 创建时间:2016年6月14日上午9:49:41
*类说明
*/
public class Publisher {
private Jedis publisherJedis;
private String channel;

public Publisher(Jedis publishJedis,String channel){
this.publisherJedis=publishJedis;
this.channel=channel;
}
public void startPublish(){
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while(true){
System.out.println("请输入message:");
String line = reader.readLine();
if(!"quit".equals(line)){
publisherJedis.publish(channel, line);
}else{
break;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
5.创建Subscriber.java文件,作为消息的接收者,具体内容如下:

package com.csdn.ingo.redis_pub_sub;

import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {

@Override
public void onMessage(String channel, String message) {
System.out.println("Channel:" + channel + ",Message:" + message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
}
6.创建测试Main.java,具体内容如下:

package com.csdn.ingo.redis_pub_sub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* @author 作者 E-mail:ingo
* @version 创建时间:2016年6月14日上午9:55:01 类说明
*/
public class TestMain {
public static final String CHANNEL = "mychannel";
public static final String HOST = "127.0.0.1";
public static final int PORT = 6379;

private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);

public static void main(String[] args) {
final Jedis subscriberJedis = JEDIS_POOL.getResource();
final Jedis publisherJedis = JEDIS_POOL.getResource();
final Subscriber subscriber = new Subscriber();
new Thread(new Runnable() {
public void run() {
try {
System.out.println("Subscribing to mychannel,this thread will be block");
subscriberJedis.subscribe(subscriber, CHANNEL);
System.out.println("subscription ended");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Publisher(publisherJedis, CHANNEL).startPublish();
publisherJedis.close();

subscriber.unsubscribe();
subscriberJedis.close();
}
}
7.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下:



注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。

-------------------------------------------------------------------------------------------------------------------------------------

二。持久化的订阅与发布。【其本质就是在发布消息之前,将消息先存入到Redis中,在调用发布命令】

1.在上文使用的工程中新建持久化方式的package,或者新建与上文同样的工程。

2.新建PPubClient.java,作为持久化的发布者,具体内容如下:

package com.csdn.ingo.redis.persistence;

import java.util.Set;

import redis.clients.jedis.Jedis;

/**
*@author 作者 E-mail:ingo
*@version 创建时间:2016年6月16日上午8:55:21
*类说明
*/
public class PPubClient {
private Jedis jedis;
private String CONSTANT_CLIENTSET = "clientSet";
public PPubClient(String host,int port){
jedis = new Jedis(host,port);
}
private void put(String message){
Set<String> subClients = jedis.smembers(CONSTANT);
for(String clientKey:subClients){
jedis.rpush(clientKey, message);
}
}
public void pub(String channel,String message){
Long txid = jedis.incr("MAXID");
String content = txid+"/"+message;
this.put(content);
jedis.publish(channel, message);
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);
}
}
2.新建PPSubClient.java,作为持久化的接收者,具体内容如下:

package com.csdn.ingo.redis.persistence;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
*@author 作者 E-mail:ingo
*@version 创建时间:2016年6月16日上午9:01:45
*类说明
*/
public class PPSubClient {
private Jedis jedis;
private JedisPubSub listener;
private String CONSTANT_CLIENTSET="clientSet";
public PPSubClient(String host,int port,String clientId){
jedis = new Jedis(host,port);
listener = new PPrintListener(clientId,new Jedis(host,port));
jedis.sadd(<span style="font-family: 'Microsoft YaHei';">CONSTANT_CLIENTSET</span><span style="font-family: 'Microsoft YaHei';">, clientId);</span>
}
public void sub(String channel){
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
listener.unsubscribe(channel);
}
}
3.新建PPrintListener.java,作为持久化消息接收者的输出。具体内容如下:

package com.csdn.ingo.redis.persistence;

import java.util.Date;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
* @author 作者 E-mail:ingo
* @version 创建时间:2016年6月16日上午8:30:00 类说明
*/
public class PPrintListener extends JedisPubSub {

private String clientId;
private PSubHandler handler;
private String CONSTANT = "clientSet";
public PPrintListener(String clientId, Jedis jedis) {
this.clientId = clientId;
handler = new PSubHandler(jedis);
}

@Override
public void onMessage(String channel, String message) {
if (message.equalsIgnoreCase("quit")) {
this.unsubscribe(channel);
}
handler.handle(channel, message);
System.out.println("message receive:" + message + ",channel:" + channel);
}

private void message(String channel, String message) {
Date time = new Date();
System.out.println("message receive:" + message + ",channel:" + channel + time.toString());
}

@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("message receive:" + message + ",pattern channel:" + channel);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
handler.subscribe(channel);
System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
handler.unsubscribe(channel);
System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);
}

@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
for (String channel : channels) {
handler.unsubscribe(channel);
}
}

class PSubHandler {
private Jedis jedis;

PSubHandler(Jedis jedis) {
this.jedis = jedis;
}

public void handle(String channel, String message) {
int index = message.indexOf("/");
if (index < 0) {
return;
}
Long txid = Long.valueOf(message.substring(0, index));
String key = clientId + "/" + channel;
while (true) {
String lm = jedis.lindex(key, 0);
if (lm == null) {
break;
}
int li = lm.indexOf("/");
if(li<0){
String result = jedis.lpop(key);
if(result == null){
break;
}
message(channel, lm);
continue;
}
Long lxid = Long.valueOf(lm.substring(0, li));
if(txid>=lxid){
jedis.lpop(key);
message(channel,lm);
continue;
}else{
break;
}
}
}
public void subscribe(String channel){
String key = clientId+"/"+channel;
boolean exist = jedis.sismember(CONSTANT, key);
if(!exist){
jedis.sadd(CONSTANT, key);
}
}
public void unsubscribe(String channel){
String key = clientId+"/"+channel;
jedis.srem(CONSTANT, key);
jedis.del(key);
}
}
}
4.创建测试Main方法,具体内容如下:

package com.csdn.ingo.redis.persistence;
/**
* @author 作者 E-mail:ingo
* @version 创建时间:2016年6月16日上午9:07:00 类说明
*/
public class PPubSubTestMain {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 6379;
String clientId = "myclient";
PPubClient pubClient = new PPubClient(host, port);
final String channel = "mychannel";
final PPSubClient subClient = new PPSubClient(host, port, clientId);
Thread subThread = new Thread(new Runnable() {
public void run() {
System.out.println("------------sub----start------------");
subClient.sub(channel);
System.out.println("------------sub----end------------");
}
});
subThread.setDaemon(true);
subThread.start();
int i = 0;
while (i < 20) {
String message = "message--" + i;
pubClient.pub(channel, message);
i++;
Thread.sleep(100);
}
subClient.unsubscribe(channel);
}
}
5.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。这次我们是以循环调用作为输入内容作为消息发布的内容,各位看官观察控制台输出。示例如下:



然后,打开Redis客户端,观察当前Redis中保留的所有数据:



题外的话:

Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能。

--------------------------------------------------------------------------------------------------------------------------------------------------------

至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束

在此,对以下参考资料的作者表示感谢!:

参考资料:

redis官网:

http://redis.io/topics/pubsub

其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: