Spring-data-redis: pub/sub消息订阅
2017-06-19 00:59
507 查看
Redis中pub/sub特性,可以用来实现类似与JMS的“topic”功能,只不过这些消息无法被持久化而已。spring-data-redis组件中对pub/sub提供了类似JMS的编程模式,我们通过实例来展示如何使用。
需要注意的是,在redis中消息的订阅端(subscribe)需要独占链接,那么消息接收将是阻塞的。
代码实例中,使用了“连接池”/“消息异步接受”“消息并发处理”,请根据需要调整相关参数。
1) Redis中"pub/sub"的消息,为"即发即失",server不会保存消息,如果publish的消息,没有任何client处于"subscribe"状态,消息将会被丢弃.如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失.Redis server将会"尽力"将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收.
2) 如果期望pub/sub的消息时持久的,那么需要借助额外的功能.参见"pub/sub持久化订阅"
一.配置文件:
Java代码
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxActive" value="32"></property>
<property name="maxIdle" value="6"></property>
<property name="maxWait" value="15000"></property>
<property name="minEvictableIdleTimeMillis" value="300000"></property>
<property name="numTestsPerEvictionRun" value="3"></property>
<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
<property name="whenExhaustedAction" value="1"></property>
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="poolConfig" ref="jedisPoolConfig"></property>
<property name="hostName" value="127.0.0.1"></property>
<property name="port" value="6379"></property>
<property name="password" value="0123456"></property>
<property name="timeout" value="15000"></property>
<property name="usePool" value="true"></property>
</bean>
<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"></property>
<property name="defaultSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
<bean id="topicMessageListener" class="com.sample.redis.sdr.TopicMessageListener">
<property name="redisTemplate" ref="jedisTemplate"></property>
</bean>
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<property name="taskExecutor"><!-- 此处有个奇怪的问题,无法正确使用其他类型的Executor -->
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="3"></property>
</bean>
</property>
<property name="messageListeners">
<map>
<entry key-ref="topicMessageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="user:topic"/>
</bean>
</entry>
</map>
</property>
</bean>
</beans>
二.消息发布(pub):
Java代码
String channel = "user:topic";
//其中channel必须为string,而且“序列化”策略也是StringSerializer
//消息内容,将会根据配置文件中指定的valueSerializer进行序列化
//本例中,默认全部采用StringSerializer
//那么在消息的subscribe端也要对“发序列化”保持一致。
redisTemplate.convertAndSend(channel, "from app 1");
三.消息接收(subscribe):
1) TopicMessageListener类:
Java代码
public class TopicMessageListener implements MessageListener {
private RedisTemplate redisTemplate;
public void setRedisTemplate(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();//请使用valueSerializer
byte[] channel = message.getChannel();
//请参考配置文件,本例中key,value的序列化方式均为string。
//其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);
String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);
//...
}
}
2) 你会发现上述编程风格非常像JMS。需要注意的是消息体的反序列化。
需要注意的是,在redis中消息的订阅端(subscribe)需要独占链接,那么消息接收将是阻塞的。
代码实例中,使用了“连接池”/“消息异步接受”“消息并发处理”,请根据需要调整相关参数。
1) Redis中"pub/sub"的消息,为"即发即失",server不会保存消息,如果publish的消息,没有任何client处于"subscribe"状态,消息将会被丢弃.如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失.Redis server将会"尽力"将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收.
2) 如果期望pub/sub的消息时持久的,那么需要借助额外的功能.参见"pub/sub持久化订阅"
一.配置文件:
Java代码
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxActive" value="32"></property>
<property name="maxIdle" value="6"></property>
<property name="maxWait" value="15000"></property>
<property name="minEvictableIdleTimeMillis" value="300000"></property>
<property name="numTestsPerEvictionRun" value="3"></property>
<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
<property name="whenExhaustedAction" value="1"></property>
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="poolConfig" ref="jedisPoolConfig"></property>
<property name="hostName" value="127.0.0.1"></property>
<property name="port" value="6379"></property>
<property name="password" value="0123456"></property>
<property name="timeout" value="15000"></property>
<property name="usePool" value="true"></property>
</bean>
<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"></property>
<property name="defaultSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
<bean id="topicMessageListener" class="com.sample.redis.sdr.TopicMessageListener">
<property name="redisTemplate" ref="jedisTemplate"></property>
</bean>
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<property name="taskExecutor"><!-- 此处有个奇怪的问题,无法正确使用其他类型的Executor -->
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="3"></property>
</bean>
</property>
<property name="messageListeners">
<map>
<entry key-ref="topicMessageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="user:topic"/>
</bean>
</entry>
</map>
</property>
</bean>
</beans>
二.消息发布(pub):
Java代码
String channel = "user:topic";
//其中channel必须为string,而且“序列化”策略也是StringSerializer
//消息内容,将会根据配置文件中指定的valueSerializer进行序列化
//本例中,默认全部采用StringSerializer
//那么在消息的subscribe端也要对“发序列化”保持一致。
redisTemplate.convertAndSend(channel, "from app 1");
三.消息接收(subscribe):
1) TopicMessageListener类:
Java代码
public class TopicMessageListener implements MessageListener {
private RedisTemplate redisTemplate;
public void setRedisTemplate(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();//请使用valueSerializer
byte[] channel = message.getChannel();
//请参考配置文件,本例中key,value的序列化方式均为string。
//其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);
String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);
//...
}
}
2) 你会发现上述编程风格非常像JMS。需要注意的是消息体的反序列化。
相关文章推荐
- Spring-data-redis: pub/sub消息订阅
- Spring Data Redis—Pub/Sub(附Web项目源码)
- Spring Data Redis—Pub/Sub(附Web项目源码)
- Spring Data Redis—Pub/Sub(附Web项目源码) (转)
- Spring Data Redis 2 之消息订阅
- redis 高级应用之二(Redis的持久化 和 消息的[pub/sub]发布和订阅)
- Spring mvc Data Redis—Pub/Sub(附Web项目源码)
- Redis pub/sub机制在实际运用场景的缺陷&&模拟JMS消息发布订阅的持久化特性
- linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能 标签: hiredishiredis异步APIhiredis事件处理redis消息订阅发布redis c接口 2016-
- Redis发布及订阅消息(pub/sub)
- 【Redis】Java实现redis消息订阅/发布(PubSub)
- redis 高级应用之二(Redis的持久化 和 消息的[pub/sub]发布和订阅)
- Spring Data Redis实现消息队列——发布/订阅模式
- Spring Data Redis(Redis Messaging/PubSub)
- Spring Data Redis—Pub/Sub(附Web项目源码)
- Spring data redis pubsub 简单接入
- Redis教程03——Redis 发布/订阅(Pub/Sub)
- Spring Data Redis实现一个订阅/发布系统
- redis 学习手册之发布和订阅pubsub操作
- linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能