springboot使用redis队列作为后台任务处理队列
2018-03-06 17:43
609 查看
springboot使用redis队列作为后台任务处理队列
1.maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2. java config
@Bean public JedisPoolConfig getJedisPoolConfig(){ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); //最大空闲连接数, 默认8个 jedisPoolConfig.setMaxIdle(4); //最大连接数, 默认8个 jedisPoolConfig.setMaxTotal(8); //最小空闲连接数, 默认0 jedisPoolConfig.setMinIdle(1); //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1 jedisPoolConfig.setMaxWaitMillis(15000); //逐出连接的最小空闲时间 默认1800000毫秒(30分钟) jedisPoolConfig.setMinEvictableIdleTimeMillis(300000); //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3 jedisPoolConfig.setNumTestsPerEvictionRun(3); //一个连接在池中最小生存的时间 jedisPoolConfig.setTimeBetweenEvictionRunsMillis(60000); //连接超时时是否阻塞,false时报异常,ture阻塞直到超时, 默认true jedisPoolConfig.setBlockWhenExhausted(true); return jedisPoolConfig; } @Bean(name = "jedisConnectionFactory") public JedisConnectionFactory getJedisConnectionFactory(JedisPoolConfig jedisPoolConfig){ JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); jedisConnectionFactory.setPoolConfig(jedisPoolConfig); jedisConnectionFactory.setHostName(redisHost); jedisConnectionFactory.setPort(redisPort); jedisConnectionFactory.setPassword(redisPassword); jedisConnectionFactory.setTimeout(redisProperties.getTimeout()); return jedisConnectionFactory; } @Bean public RedisTemplate getRedisTemplate(@Qualifier("jedisConnectionFactory") JedisConnectionFactory jedisConnectionFactory){ RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(jedisConnectionFactory); redisTemplate.setDefaultSerializer(new StringRedisSerializer()); return redisTemplate; }
3.建立一个队列
public class RedisQueue<T> { private BoundListOperations<String, T> listOperations;//noblocking private static Lock lock = new ReentrantLock();//基于底层IO阻塞考虑 private RedisTemplate redisTemplate; private byte[] rawKey; public RedisQueue(RedisTemplate redisTemplate, String key){ this.redisTemplate = redisTemplate; rawKey = redisTemplate.getKeySerializer().serialize(key); listOperations = redisTemplate.boundListOps(key); } /** * blocking 一直阻塞直到队列里边有数据 * remove and get last item from queue:BRPOP * @return */ public T takeFromTail(int timeout) throws InterruptedException{ lock.lockInterruptibly(); RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory(); RedisConnection connection = connectionFactory.getConnection(); try{ List<byte[]> results = connection.bRPop(timeout, rawKey); if(CollectionUtils.isEmpty(results)){ return null; } return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); }finally{ lock.unlock(); RedisConnectionUtils.releaseConnection(connection, connectionFactory); } } public T takeFromTail() throws InterruptedException{ return takeFromTail(0); } /** * 从队列的头,插入 */ public void pushFromHead(T value){ listOperations.leftPush(value); } public void pushFromTail(T value){ listOperations.rightPush(value); } /** * noblocking * @return null if no item in queue */ public T removeFromHead(){ return listOperations.leftPop(); } public T removeFromTail(){ return listOperations.rightPop(); } /** * blocking 一直阻塞直到队列里边有数据 * remove and get first item from queue:BLPOP * @return */ public T takeFromHead(int timeout) throws InterruptedException{ lock.lockInterruptibly(); RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory(); RedisConnection connection = connectionFactory.getConnection(); try{ List<byte[]> results = connection.bLPop(timeout, rawKey); if(CollectionUtils.isEmpty(results)){ return null; } return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); }finally{ lock.unlock(); RedisConnectionUtils.releaseConnection(connection, connectionFactory); } } public T takeFromHead() throws InterruptedException{ return takeFromHead(0); } }
4.容器类
一般容器类的设计最为重要, 作用是作为spring bean 拿到spring管理的相关bean, 同时可以管理多个队列等.
@Component public class RedisTaskContainer { private Logger logger = LoggerFactory.getLogger(this.getClass()); private static int runTaskThreadNum = 2;//Runtime.getRuntime().availableProcessors() //使用一个统一维护的线程池来管理隔离线程 private static ExecutorService es = Executors.newFixedThreadPool(runTaskThreadNum); @Resource private RedisTemplate redisTemplate; public static String ORDER_SEND_REDIS_QUEQUE = "order:send:redis:queue"; //队列里边的数据泛型可以根据实际情况调整, 可以定义多个类似的队列 private RedisQueue<Map<String, List<OrderSendBO>>> redisQueue = null; @PostConstruct private void init(){ redisQueue = new RedisQueue(redisTemplate, ORDER_SEND_REDIS_QUEQUE); Consumer<Map<String, List<OrderSendBO>>> consumer = (data) -> { // do something } //提交线程 for (int i = 0; i < runTaskThreadNum; i++) { es.execute( new OrderSendRedisConsumer(this, consumer) ); } } public RedisQueue<Map<String, List<OrderSendBO>>> getRedisQueue() { return redisQueue; } }
5.消费线程类
public class OrderSendRedisConsumer extends Thread { private Logger logger = LoggerFactory.getLogger(this.getClass()); private RedisTaskContainer container; private Consumer<Map<String, List<OrderSendBO>>> consumer; public OrderSendRedisConsumer(RedisTaskContainer container, Consumer<Map<String, List<OrderSendBO>>> consumer){ this.container = container; this.consumer = consumer; } @Override public void run() { try{ while(true){ Map<String, List<OrderSendBO>> value = container.getRedisQueue().takeFromTail();//cast exception? you should check. //逐个执行 if(value != null){ try{ consumer.accept(value); }catch(Exception e){ logger.error("调用失败", e); } } } }catch(Exception e){ logger.error("轮循线程异常退出", e); } } }
如果需要给前端页面以视觉反馈, 可以给一个任务一个标识, 存入队列之前, 也放入redis; 任务被队列消费之后, 删除这个标识; 前端页面定时轮询这个标识是否存在即可;
相关文章推荐
- spring boot-使用redis的Keyspace Notifications实现定时任务队列
- Spring-Boot中如何使用多线程处理任务
- 使用NODEJS+REDIS开发一个消息队列以及定时任务处理
- 3 Springboot中使用redis,redis自动缓存异常处理
- 使用Redis作为SpringBoot项目数据缓存
- spring-boot 结合spring-data-redis使用redis的消息队列
- REDIS学习(3.2)spring boot 使用redis作为缓存
- springboot 整合dubbo最佳实践 (使用redis作为注册中心)
- 详解Spring-Boot中如何使用多线程处理任务
- Spring-Boot中如何使用多线程处理任务
- Spring-Boot中如何使用多线程处理任务
- spring boot环境下使用quartz设置定时任务
- springboot结合redis使用CachingConfigurerSupport方法不能被继承
- Spring Boot使用redis做数据缓存
- Spring Boot使用redis做数据缓存
- (14)Spring Boot定时任务的使用【从零开始学Spring Boot】
- spring-boot 整合redis作为数据缓存
- spring boot使用redis
- Spring Boot 定时任务的使用
- redis中的基本数据类型,以及在Spring-Boot对Redis的基本使用