Spring-data-redis: 分布式队列
2016-12-30 15:58
507 查看
Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。
Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。
我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。
一.配置文件:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxActive" value="32"></property>
<property name="maxIdle" value="6"></property>
<property name="maxWait" value="15000"></property>
<property name="minEvictableIdleTimeMillis" value="300000"></property>
<property name="numTestsPerEvictionRun" value="3"></property>
<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
<property name="whenExhaustedAction" value="1"></property>
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="poolConfig" ref="jedisPoolConfig"></property>
<property name="hostName" value="127.0.0.1"></property>
<property name="port" value="6379"></property>
<property name="password" value="0123456"></property>
<property name="timeout" value="15000"></property>
<property name="usePool" value="true"></property>
</bean>
<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"></property>
<property name="defaultSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
<bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>
<bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">
<property name="redisTemplate" ref="jedisTemplate"></property>
<property name="key" value="user:queue"></property>
<property name="listener" ref="jedisQueueListener"></property>
</bean>
</beans>
二.程序实例:
1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。
public interface RedisQueueListener<T> {
public void onMessage(T value);
}
public class QueueListener<String> implements RedisQueueListener<String> {
@Override
public void onMessage(String value) {
System.out.println(value);
}
}
2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。
public class RedisQueue<T> implements InitializingBean,DisposableBean{
private RedisTemplate redisTemplate;
private String key;
private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据
private byte[] rawKey;
private RedisConnectionFactory factory;
private RedisConnection connection;//for blocking
private BoundListOperations<String, T> listOperations;//noblocking
private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑
private RedisQueueListener listener;//异步回调
private Thread listenerThread;
private boolean isClosed;
public void setRedisTemplate(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void setListener(RedisQueueListener listener) {
this.listener = listener;
}
public void setKey(String key) {
this.key = key;
}
@Override
public void afterPropertiesSet() throws Exception {
factory = redisTemplate.getConnectionFactory();
connection = RedisConnectionUtils.getConnection(factory);
rawKey = redisTemplate.getKeySerializer().serialize(key);
listOperations = redisTemplate.boundListOps(key);
if(listener != null){
listenerThread = new ListenerThread();
listenerThread.setDaemon(true);
listenerThread.start();
}
}
/**
* blocking
* remove and get last item from queue:BRPOP
* @return
*/
public T takeFromTail(int timeout) throws InterruptedException{
lock.lockInterruptibly();
try{
List<byte[]> results = connection.bRPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
}
}
public T takeFromTail() throws InterruptedException{
return takeFromHead(0);
}
/**
* 从队列的头,插入
*/
public void pushFromHead(T value){
listOperations.leftPush(value);
}
public void pushFromTail(T value){
listOperations.rightPush(value);
}
/**
* noblocking
* @return null if no item in queue
*/
public T removeFromHead(){
return listOperations.leftPop();
}
public T removeFromTail(){
return listOperations.rightPop();
}
/**
* blocking
* remove and get first item from queue:BLPOP
* @return
*/
public T takeFromHead(int timeout) throws InterruptedException{
lock.lockInterruptibly();
try{
List<byte[]> results = connection.bLPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
}
}
public T takeFromHead() throws InterruptedException{
return takeFromHead(0);
}
@Override
public void destroy() throws Exception {
if(isClosed){
return;
}
shutdown();
RedisConnectionUtils.releaseConnection(connection, factory);
}
private void shutdown(){
try{
listenerThread.interrupt();
}catch(Exception e){
//
}
}
class ListenerThread extends Thread {
@Override
public void run(){
try{
while(true){
T value = takeFromHead();//cast exceptionyou should check.
//逐个执行
if(value != null){
try{
listener.onMessage(value);
}catch(Exception e){
//
}
}
}
}catch(InterruptedException e){
//
}
}
}
}
3) 使用与测试:
public static void main(String[] args) throws Exception{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");
RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");
redisQueue.pushFromHead("test:app");
Thread.sleep(15000);
redisQueue.pushFromHead("test:app");
Thread.sleep(15000);
redisQueue.destroy();
}
在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。
Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。
我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。
一.配置文件:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxActive" value="32"></property>
<property name="maxIdle" value="6"></property>
<property name="maxWait" value="15000"></property>
<property name="minEvictableIdleTimeMillis" value="300000"></property>
<property name="numTestsPerEvictionRun" value="3"></property>
<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
<property name="whenExhaustedAction" value="1"></property>
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="poolConfig" ref="jedisPoolConfig"></property>
<property name="hostName" value="127.0.0.1"></property>
<property name="port" value="6379"></property>
<property name="password" value="0123456"></property>
<property name="timeout" value="15000"></property>
<property name="usePool" value="true"></property>
</bean>
<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"></property>
<property name="defaultSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
<bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>
<bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">
<property name="redisTemplate" ref="jedisTemplate"></property>
<property name="key" value="user:queue"></property>
<property name="listener" ref="jedisQueueListener"></property>
</bean>
</beans>
二.程序实例:
1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。
public interface RedisQueueListener<T> {
public void onMessage(T value);
}
public class QueueListener<String> implements RedisQueueListener<String> {
@Override
public void onMessage(String value) {
System.out.println(value);
}
}
2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。
public class RedisQueue<T> implements InitializingBean,DisposableBean{
private RedisTemplate redisTemplate;
private String key;
private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据
private byte[] rawKey;
private RedisConnectionFactory factory;
private RedisConnection connection;//for blocking
private BoundListOperations<String, T> listOperations;//noblocking
private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑
private RedisQueueListener listener;//异步回调
private Thread listenerThread;
private boolean isClosed;
public void setRedisTemplate(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void setListener(RedisQueueListener listener) {
this.listener = listener;
}
public void setKey(String key) {
this.key = key;
}
@Override
public void afterPropertiesSet() throws Exception {
factory = redisTemplate.getConnectionFactory();
connection = RedisConnectionUtils.getConnection(factory);
rawKey = redisTemplate.getKeySerializer().serialize(key);
listOperations = redisTemplate.boundListOps(key);
if(listener != null){
listenerThread = new ListenerThread();
listenerThread.setDaemon(true);
listenerThread.start();
}
}
/**
* blocking
* remove and get last item from queue:BRPOP
* @return
*/
public T takeFromTail(int timeout) throws InterruptedException{
lock.lockInterruptibly();
try{
List<byte[]> results = connection.bRPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
}
}
public T takeFromTail() throws InterruptedException{
return takeFromHead(0);
}
/**
* 从队列的头,插入
*/
public void pushFromHead(T value){
listOperations.leftPush(value);
}
public void pushFromTail(T value){
listOperations.rightPush(value);
}
/**
* noblocking
* @return null if no item in queue
*/
public T removeFromHead(){
return listOperations.leftPop();
}
public T removeFromTail(){
return listOperations.rightPop();
}
/**
* blocking
* remove and get first item from queue:BLPOP
* @return
*/
public T takeFromHead(int timeout) throws InterruptedException{
lock.lockInterruptibly();
try{
List<byte[]> results = connection.bLPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
}
}
public T takeFromHead() throws InterruptedException{
return takeFromHead(0);
}
@Override
public void destroy() throws Exception {
if(isClosed){
return;
}
shutdown();
RedisConnectionUtils.releaseConnection(connection, factory);
}
private void shutdown(){
try{
listenerThread.interrupt();
}catch(Exception e){
//
}
}
class ListenerThread extends Thread {
@Override
public void run(){
try{
while(true){
T value = takeFromHead();//cast exceptionyou should check.
//逐个执行
if(value != null){
try{
listener.onMessage(value);
}catch(Exception e){
//
}
}
}
}catch(InterruptedException e){
//
}
}
}
}
3) 使用与测试:
public static void main(String[] args) throws Exception{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");
RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");
redisQueue.pushFromHead("test:app");
Thread.sleep(15000);
redisQueue.pushFromHead("test:app");
Thread.sleep(15000);
redisQueue.destroy();
}
在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。
相关文章推荐
- Spring-data-redis: 分布式队列
- Spring-data-redis: 分布式队列
- Spring-data-redis: 分布式队列
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- Spring Data Redis实现消息队列——发布/订阅模式
- spring-boot 结合spring-data-redis使用redis的消息队列
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- spring-data-redis队列
- 利用redis(spring-data-redis)锁的功能来实现定时器的分布式
- shiro+cas+spring-data-redis实现多系统单点登录和分布式项目的session同步
- redis实现 spring-redis-data初学习
- redis实现 spring-redis-data初学习二 进阶,存取对象
- 在spring data jpa中使用redis的通用list及entity存储方法
- Spring Data操作Redis时,发现key值出现 \xac\xed\x00\x05t\x00
- Maven中Spring-Data-Redis存储对象(redisTemplate)
- redis实现 spring-redis-data初学习二 进阶,存取对象
- spring-data-redis简单操作
- springdata redis实现的简单demo
- spring-data-redis使用自定义序列化数据 使用 protobuf