您的位置:首页 > 数据库 > Redis

springboot使用redis队列作为后台任务处理队列

2018-03-06 17:43 609 查看

springboot使用redis队列作为后台任务处理队列

1.maven依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>


2. java config

@Bean
public JedisPoolConfig getJedisPoolConfig(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//最大空闲连接数, 默认8个
jedisPoolConfig.setMaxIdle(4);
//最大连接数, 默认8个
jedisPoolConfig.setMaxTotal(8);
//最小空闲连接数, 默认0
jedisPoolConfig.setMinIdle(1);
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
jedisPoolConfig.setMaxWaitMillis(15000);
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
jedisPoolConfig.setMinEvictableIdleTimeMillis(300000);
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
jedisPoolConfig.setNumTestsPerEvictionRun(3);
//一个连接在池中最小生存的时间
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(60000);
//连接超时时是否阻塞,false时报异常,ture阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(true);
return jedisPoolConfig;
}

@Bean(name = "jedisConnectionFactory")
public JedisConnectionFactory getJedisConnectionFactory(JedisPoolConfig jedisPoolConfig){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setPoolConfig(jedisPoolConfig);
jedisConnectionFactory.setHostName(redisHost);
jedisConnectionFactory.setPort(redisPort);
jedisConnectionFactory.setPassword(redisPassword);
jedisConnectionFactory.setTimeout(redisProperties.getTimeout());
return jedisConnectionFactory;
}

@Bean
public RedisTemplate getRedisTemplate(@Qualifier("jedisConnectionFactory") JedisConnectionFactory jedisConnectionFactory){
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
return redisTemplate;
}


3.建立一个队列

public class RedisQueue<T> {

private BoundListOperations<String, T> listOperations;//noblocking

private static Lock lock = new ReentrantLock();//基于底层IO阻塞考虑

private RedisTemplate redisTemplate;

private byte[] rawKey;

public RedisQueue(RedisTemplate redisTemplate, String key){
this.redisTemplate = redisTemplate;
rawKey = redisTemplate.getKeySerializer().serialize(key);
listOperations = redisTemplate.boundListOps(key);
}

/**
* blocking 一直阻塞直到队列里边有数据
* remove and get last item from queue:BRPOP
* @return
*/
public T takeFromTail(int timeout) throws InterruptedException{
lock.lockInterruptibly();
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
RedisConnection connection = connectionFactory.getConnection();
try{
List<byte[]> results = connection.bRPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
RedisConnectionUtils.releaseConnection(connection, connectionFactory);
}
}

public T takeFromTail() throws InterruptedException{
return takeFromTail(0);
}

/**
* 从队列的头,插入
*/
public void pushFromHead(T value){
listOperations.leftPush(value);
}

public void pushFromTail(T value){
listOperations.rightPush(value);
}

/**
* noblocking
* @return null if no item in queue
*/
public T removeFromHead(){
return listOperations.leftPop();
}

public T removeFromTail(){
return listOperations.rightPop();
}

/**
* blocking 一直阻塞直到队列里边有数据
* remove and get first item from queue:BLPOP
* @return
*/
public T takeFromHead(int timeout) throws InterruptedException{
lock.lockInterruptibly();
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
RedisConnection connection = connectionFactory.getConnection();
try{
List<byte[]> results = connection.bLPop(timeout, rawKey);
if(CollectionUtils.isEmpty(results)){
return null;
}
return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
}finally{
lock.unlock();
RedisConnectionUtils.releaseConnection(connection, connectionFactory);
}
}

public T takeFromHead() throws InterruptedException{
return takeFromHead(0);
}

}


4.容器类

一般容器类的设计最为重要, 作用是作为spring bean 拿到spring管理的相关bean, 同时可以管理多个队列等.


@Component
public class RedisTaskContainer {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private static int runTaskThreadNum = 2;//Runtime.getRuntime().availableProcessors()

//使用一个统一维护的线程池来管理隔离线程
private static ExecutorService es = Executors.newFixedThreadPool(runTaskThreadNum);

@Resource
private RedisTemplate redisTemplate;

public static String ORDER_SEND_REDIS_QUEQUE = "order:send:redis:queue";

//队列里边的数据泛型可以根据实际情况调整, 可以定义多个类似的队列
private RedisQueue<Map<String, List<OrderSendBO>>> redisQueue = null;

@PostConstruct
private void init(){
redisQueue = new RedisQueue(redisTemplate, ORDER_SEND_REDIS_QUEQUE);

Consumer<Map<String, List<OrderSendBO>>> consumer = (data) -> {
// do something
}

//提交线程
for (int i = 0; i < runTaskThreadNum; i++) {
es.execute(
new OrderSendRedisConsumer(this, consumer)
);
}
}

public RedisQueue<Map<String, List<OrderSendBO>>> getRedisQueue() {
return redisQueue;
}

}


5.消费线程类

public class OrderSendRedisConsumer extends Thread {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private RedisTaskContainer container;

private Consumer<Map<String, List<OrderSendBO>>> consumer;

public OrderSendRedisConsumer(RedisTaskContainer container, Consumer<Map<String, List<OrderSendBO>>> consumer){
this.container = container;
this.consumer = consumer;
}

@Override
public void run() {
try{
while(true){
Map<String, List<OrderSendBO>> value = container.getRedisQueue().takeFromTail();//cast exception? you should check.
//逐个执行
if(value != null){
try{
consumer.accept(value);
}catch(Exception e){
logger.error("调用失败", e);
}
}
}
}catch(Exception e){
logger.error("轮循线程异常退出", e);
}
}
}


​ 如果需要给前端页面以视觉反馈, 可以给一个任务一个标识, 存入队列之前, 也放入redis; 任务被队列消费之后, 删除这个标识; 前端页面定时轮询这个标识是否存在即可;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: