您的位置:首页 > 数据库 > Redis

Spring-data-redis: 分布式队列

2016-12-30 15:58 507 查看
Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。

Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。

我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。

一.配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"   

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">  

    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  

        <property name="maxActive" value="32"></property>  

        <property name="maxIdle" value="6"></property>  

        <property name="maxWait" value="15000"></property>  

        <property name="minEvictableIdleTimeMillis" value="300000"></property>  

        <property name="numTestsPerEvictionRun" value="3"></property>  

        <property name="timeBetweenEvictionRunsMillis" value="60000"></property>  

        <property name="whenExhaustedAction" value="1"></property>  

    </bean>  

    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">  

        <property name="poolConfig" ref="jedisPoolConfig"></property>  

        <property name="hostName" value="127.0.0.1"></property>  

        <property name="port" value="6379"></property>  

        <property name="password" value="0123456"></property>  

        <property name="timeout" value="15000"></property>  

        <property name="usePool" value="true"></property>  

    </bean>  

    <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  

        <property name="connectionFactory" ref="jedisConnectionFactory"></property>  

        <property name="defaultSerializer">  

            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>  

        </property>  

    </bean>  

    <bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>  

    <bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">  

        <property name="redisTemplate" ref="jedisTemplate"></property>  

        <property name="key" value="user:queue"></property>  

        <property name="listener" ref="jedisQueueListener"></property>  

    </bean>  

</beans>  

二.程序实例:

1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。

public interface RedisQueueListener<T> {  

  

    public void onMessage(T value);  

}  

public class QueueListener<String> implements RedisQueueListener<String> {  

  

    @Override  

    public void onMessage(String value) {  

        System.out.println(value);  

          

    }  

  

}  

2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。

public class RedisQueue<T> implements InitializingBean,DisposableBean{  

    private RedisTemplate redisTemplate;  

    private String key;  

    private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据  

    private byte[] rawKey;  

    private RedisConnectionFactory factory;  

    private RedisConnection connection;//for blocking  

    private BoundListOperations<String, T> listOperations;//noblocking  

      

    private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑  

      

    private RedisQueueListener listener;//异步回调  

    private Thread listenerThread;  

      

    private boolean isClosed;  

      

    public void setRedisTemplate(RedisTemplate redisTemplate) {  

        this.redisTemplate = redisTemplate;  

    }  

  

    public void setListener(RedisQueueListener listener) {  

        this.listener = listener;  

    }  

  

    public void setKey(String key) {  

        this.key = key;  

    }  

      

  

    @Override  

    public void afterPropertiesSet() throws Exception {  

        factory = redisTemplate.getConnectionFactory();  

        connection = RedisConnectionUtils.getConnection(factory);  

        rawKey = redisTemplate.getKeySerializer().serialize(key);  

        listOperations = redisTemplate.boundListOps(key);  

        if(listener != null){  

            listenerThread = new ListenerThread();  

            listenerThread.setDaemon(true);  

            listenerThread.start();  

        }  

    }  

      

      

    /** 

     * blocking 

     * remove and get last item from queue:BRPOP 

     * @return 

     */  

    public T takeFromTail(int timeout) throws InterruptedException{   

        lock.lockInterruptibly();  

        try{  

            List<byte[]> results = connection.bRPop(timeout, rawKey);  

            if(CollectionUtils.isEmpty(results)){  

                return null;  

            }  

            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  

        }finally{  

            lock.unlock();  

        }  

    }  

      

    public T takeFromTail() throws InterruptedException{  

        return takeFromHead(0);  

    }  

      

    /** 

     * 从队列的头,插入 

     */  

    public void pushFromHead(T value){  

        listOperations.leftPush(value);  

    }  

      

    public void pushFromTail(T value){  

        listOperations.rightPush(value);  

    }  

      

    /** 

     * noblocking 

     * @return null if no item in queue 

     */  

    public T removeFromHead(){  

        return listOperations.leftPop();  

    }  

      

    public T removeFromTail(){  

        return listOperations.rightPop();  

    }  

      

    /** 

     * blocking 

     * remove and get first item from queue:BLPOP 

     * @return 

     */  

    public T takeFromHead(int timeout) throws InterruptedException{  

        lock.lockInterruptibly();  

        try{  

            List<byte[]> results = connection.bLPop(timeout, rawKey);  

            if(CollectionUtils.isEmpty(results)){  

                return null;  

            }  

            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  

        }finally{  

            lock.unlock();  

        }  

    }  

      

    public T takeFromHead() throws InterruptedException{  

        return takeFromHead(0);  

    }  

  

    @Override  

    public void destroy() throws Exception {  

        if(isClosed){  

            return;  

        }  

        shutdown();  

        RedisConnectionUtils.releaseConnection(connection, factory);  

    }  

      

    private void shutdown(){  

        try{  

            listenerThread.interrupt();  

        }catch(Exception e){  

            //  

        }  

    }  

      

    class ListenerThread extends Thread {  

          

        @Override  

        public void run(){  

            try{  

                while(true){  

                    T value = takeFromHead();//cast exceptionyou should check.  

                    //逐个执行  

                    if(value != null){  

                        try{  

                            listener.onMessage(value);  

                        }catch(Exception e){  

                            //  

                        }  

                    }  

                }  

            }catch(InterruptedException e){  

                //  

            }  

        }  

    }  

      

}  

3) 使用与测试:

public static void main(String[] args) throws Exception{  

    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");  

    RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");  

    redisQueue.pushFromHead("test:app");  

    Thread.sleep(15000);  

    redisQueue.pushFromHead("test:app");  

    Thread.sleep(15000);  

    redisQueue.destroy();  

}  

在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息