使用RabbitMQ和阻塞队列异步处理需要长时间操作的请求(Springboot)
2019-05-24 13:44
656 查看
使用RabbitMQ和阻塞队列异步处理需要长时间操作的请求
第五更,在项目开发中有可能遇到一个时间比较长的操作,比如说批量转换Office,前端可能需要等待较长时间才能得到相应,而且并发情况下会出问题,这里我使用RabbitMQ来处理这种情况,注意,集群情况不适用。
Maven依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
application.properties简单配置
spring.rabbitmq.host=192.168.xxx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
Java 代码
controller层,来接收请求
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.io.Serializable; @RestController public class AsynController { private final RabbitTemplate rabbitTemplate; //构造方法注入rabbitTemplate @Autowired public AsynController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * * @param arg 参数实现了序列化接口就能被RabbitMQ解析 * @return */ @RequestMapping("asynOperate") public String asynOperate(Serializable arg){ //交给RabbitMQ异步处理,发送给“xxxQueue”队列 rabbitTemplate.convertAndSend("xxxQueue",arg); //直接返回成功给前端 return "success"; } }
这里监听队列
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.concurrent.ArrayBlockingQueue; @Component //监听“xxxQueue”队列 @RabbitListener(queues = "xxxQueue") public class XxxQueue { private static final Logger logger = LoggerFactory.getLogger(XxxQueue.class); //这里使用一个有界的阻塞队列,保证在并发情况下也可处理请求 public static ArrayBlockingQueue<Serializable> queue = new ArrayBlockingQueue<>(200); //在这个方法里面处理异步请求,传入的对象也可以在这里直接拿到 @RabbitHandler public void onMessage(Serializable arg){ try { //将该对象存入阻塞队列中 queue.put(arg); } catch (InterruptedException e) { logger.error("插入队列异常",e); } } }
这里执行业务逻辑
import com.example.queue.XxxQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.io.Serializable; //实现ApplicationRunner接口,在spring启动时会执行run()方法 @Component public class MyApplicationRunner implements ApplicationRunner { private static final Logger logger = LoggerFactory.getLogger(MyApplicationRunner.class); @Override public void run(ApplicationArguments args) { //这里循环去队列里面取 while (true){ try { //take()方法是阻塞的,当队列中没有元素时会阻塞在这里 Serializable arg = XxxQueue.queue.take(); doSomething(arg); } catch (InterruptedException e) { logger.error("队列异常"); } } } private void doSomething(Serializable arg){ // 执行业务逻辑 } }
方案二,解决ApplicationRunner不执行的问题
我发现将项目打成war包丢到Tomcat中,MyApplicationRunner中的run()方法竟然不执行,jar包直接启动没有问题,没办法,换种思路来做。
controller层与上述一样不需修改
修改XxxQueue
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.concurrent.ArrayBlockingQueue; @Component //监听“xxxQueue”队列 @RabbitListener(queues = "xxxQueue") public class XxxQueue { private static final Logger logger = LoggerFactory.getLogger(XxxQueue.class); //这里使用一个有界的阻塞队列,保证在并发情况下也可处理请求 public static ArrayBlockingQueue<Serializable> queue = new ArrayBlockingQueue<>(200); private volatile boolean flag = false; private final RabbitTemplate rabbitTemplate; @Autowired public XxxQueue(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } //在这个方法里面处理异步请求,传入的对象也可以在这里直接拿到 @RabbitHandler public void onMessage(Serializable arg){ //懒加载,这里用双重检查,确保并发的情况写也只会执行一次 if (!flag){ synchronized (XxxQueue.class){ if (!flag){ flag=true; //发送消息给xxxQueueHandle队列,该消息只会发一次,这里发送什么内容无所谓 rabbitTemplate.convertAndSend("xxxQueueHandle","xxx"); } } } try { //将该对象存入阻塞队列中 queue.put(arg); } catch (InterruptedException e) { logger.error("插入队列异常",e); } } }
增加一个队列,在这里处理逻辑
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; @Component //监听“xxxQueueHandle”队列 @RabbitListener(queues = "xxxQueueHandle") public class XxxQueueHandle { private static final Logger logger = LoggerFactory.getLogger(XxxQueueHandle.class); private volatile boolean flag = false; @RabbitHandler public void onMessage(String message) { //这里防止会收到多次消息,其实不加应该也可以 if (!flag){ synchronized (XxxQueue.class){ if (!flag){ flag=true; //这里循环去队列里面取 while (true){ try { //take()方法是阻塞的,当队列中没有元素时会阻塞在这里 Serializable arg = XxxQueue.queue.take(); doSomething(arg); } catch (InterruptedException e) { logger.error("队列异常"); } } } } } } private void doSomething(Serializable arg){ // 执行业务逻辑 } }
相关文章推荐
- RabbitMQ进阶使用-延时队列的配置(Spring Boot)
- Spring Boot与RabbitMQ延迟队列使用示例
- RabbitMQ进阶使用-延时队列的配置(Spring Boot)
- Spring boot 下使用RabbitMQ报错:链接拒绝和不能创建队列
- SpringBoot中连接MYSQL数据库,并使用JPA进行数据库的相关操作
- 消息队列 RabbitMQ 与 Spring 整合使用
- 使用Spring Boot操作Hive JDBC时,启动时报出错误:NoSuchMethodError: org.eclipse.jetty.servlet.ServletMapping.setDef
- Spring Boot项目中使用jdbctemplate 操作MYSQL数据库
- [置顶] 在Springboot上使用jedis来操作缓存redis --jedis的配置
- SpringBoot中连接MYSQL数据库,并使用JPA进行数据库的相关操作
- Spring中c3p0连接池的配置 及JdbcTemplate的使用 通过XML配置文件注入各种需要对象的操作 来完成数据库添加Add()方法
- 使用Jersey客户端请求Spring Boot(RESTFul)服务
- SpringBoot的RabbitMQ消息队列: 三、第二模式"Work queues"
- spring boot-同时使用jms的Queue(队列)和Topic(发布订阅)
- SpringBoot的RabbitMQ消息队列: 三、第二模式"Work queues"
- spring boot 整合 redis,使用@Cacheable,@CacheEvict,@CachePut,jedisPool操作redis数据库
- 使用springdata操作需要授权的mongodb
- SpringBoot应用之消息队列rabbitmq
- springboot【19】日志管理之使用AOP统一处理Web请求日志
- Spring Boot中使用AOP统一处理Web请求日志