您的位置:首页 > 数据库 > Redis

利用redis实现消息队列之topic模式

2018-01-11 14:33 676 查看
redis同样可以实现消息队列的发布订阅功能,发布消息者使用比较简单,订阅消息者则需要手动继承 redis.clients.jedis.JedisPubSub 这个抽象类,消费者有动作时就会回调这个实现类的方法。

新建两个maven工程,生产者和消费者。

生产者的pom文件如下<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.tansun</groupId>
<artifactId>ProducerTest</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>

</project>
生产者的main方法package com.tansun;

import java.util.Date;

import redis.clients.jedis.Jedis;

public class ProducerTest {

@SuppressWarnings("resource")
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.229.128", 6379);
// 向“channel1”的频道发送消息, 返回订阅者的数量
Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
jedis.publish("channel1","close channel");
System.out.println("发送成功,该频道有" +publishCount + "个订阅者");
}

}

消费者的pom文件与生产者相同

消费者需要手动继承 redis.clients.jedis.JedisPubSub 这个抽象类,重写部分需要的方法:package com.tansun;

import redis.clients.jedis.JedisPubSub;

public class MessageHandler extends JedisPubSub {

/*
* channel频道接收到新消息后,执行的逻辑
*/
@Override
public void onMessage(String channel, String message) {
// 执行逻辑
System.out.println(channel + "频道发来消息:" + message);
// 如果消息为 close channel, 则取消此频道的订阅
if("close channel".equals(message)){
this.unsubscribe(channel);
}
}

/*
* channel频道有新的订阅者时执行的逻辑
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "频道新增了"+ subscribedChannels +"个订阅者");
}

/*
* channel频道有订阅者退订时执行的逻辑
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "频道退订成功");
}

}

消费者的main方法package com.tansun;

import redis.clients.jedis.Jedis;

public class ConsumerTest {

@SuppressWarnings("resource")
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.229.128", 6379);
MessageHandler handler = new MessageHandler();
jedis.subscribe(handler, "channel1");
}
}

注意需要先启动消费者的main方法开启订阅,然后再启动生产者发布消息。

这里的订阅频道采用的是完全匹配的规则,另外,redis还支持按规则订阅,这里不再赘述。
957b

如果想了解redis实现消息队列的queue模式,可以参考我的另一篇文章 http://blog.csdn.net/jia_costa/article/details/79030621
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: