您的位置:首页 > 编程语言 > Java开发

使用RabbitMQ和阻塞队列异步处理需要长时间操作的请求(Springboot)

2019-05-24 13:44 656 查看

使用RabbitMQ和阻塞队列异步处理需要长时间操作的请求


第五更,在项目开发中有可能遇到一个时间比较长的操作,比如说批量转换Office,前端可能需要等待较长时间才能得到相应,而且并发情况下会出问题,这里我使用RabbitMQ来处理这种情况,注意,集群情况不适用。

Maven依赖

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

application.properties简单配置

spring.rabbitmq.host=192.168.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

Java 代码

controller层,来接收请求

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.Serializable;

@RestController
public class AsynController {

private final RabbitTemplate rabbitTemplate;

//构造方法注入rabbitTemplate
@Autowired
public AsynController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

/**
*
* @param arg 参数实现了序列化接口就能被RabbitMQ解析
* @return
*/
@RequestMapping("asynOperate")
public String asynOperate(Serializable arg){
//交给RabbitMQ异步处理,发送给“xxxQueue”队列
rabbitTemplate.convertAndSend("xxxQueue",arg);
//直接返回成功给前端
return "success";
}
}

这里监听队列

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;

@Component
//监听“xxxQueue”队列
@RabbitListener(queues = "xxxQueue")
public class XxxQueue {
private static final Logger logger = LoggerFactory.getLogger(XxxQueue.class);

//这里使用一个有界的阻塞队列,保证在并发情况下也可处理请求
public static ArrayBlockingQueue<Serializable> queue = new ArrayBlockingQueue<>(200);

//在这个方法里面处理异步请求,传入的对象也可以在这里直接拿到
@RabbitHandler
public void onMessage(Serializable arg){
try {
//将该对象存入阻塞队列中
queue.put(arg);
} catch (InterruptedException e) {
logger.error("插入队列异常",e);
}
}
}

这里执行业务逻辑

import com.example.queue.XxxQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;

//实现ApplicationRunner接口,在spring启动时会执行run()方法
@Component
public class MyApplicationRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(MyApplicationRunner.class);

@Override
public void run(ApplicationArguments args) {
//这里循环去队列里面取
while (true){
try {
//take()方法是阻塞的,当队列中没有元素时会阻塞在这里
Serializable arg = XxxQueue.queue.take();
doSomething(arg);
} catch (InterruptedException e) {
logger.error("队列异常");
}

}
}

private void doSomething(Serializable arg){
// 执行业务逻辑
}
}

方案二,解决ApplicationRunner不执行的问题

我发现将项目打成war包丢到Tomcat中,MyApplicationRunner中的run()方法竟然不执行,jar包直接启动没有问题,没办法,换种思路来做。

controller层与上述一样不需修改

修改XxxQueue

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;

@Component
//监听“xxxQueue”队列
@RabbitListener(queues = "xxxQueue")
public class XxxQueue {
private static final Logger logger = LoggerFactory.getLogger(XxxQueue.class);

//这里使用一个有界的阻塞队列,保证在并发情况下也可处理请求
public static ArrayBlockingQueue<Serializable> queue = new ArrayBlockingQueue<>(200);

private volatile boolean flag = false;

private final RabbitTemplate rabbitTemplate;

@Autowired
public XxxQueue(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

//在这个方法里面处理异步请求,传入的对象也可以在这里直接拿到
@RabbitHandler
public void onMessage(Serializable arg){

//懒加载,这里用双重检查,确保并发的情况写也只会执行一次
if (!flag){
synchronized (XxxQueue.class){
if (!flag){
flag=true;
//发送消息给xxxQueueHandle队列,该消息只会发一次,这里发送什么内容无所谓
rabbitTemplate.convertAndSend("xxxQueueHandle","xxx");
}
}
}

try {
//将该对象存入阻塞队列中
queue.put(arg);
} catch (InterruptedException e) {
logger.error("插入队列异常",e);
}
}
}

增加一个队列,在这里处理逻辑

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.Serializable;

@Component
//监听“xxxQueueHandle”队列
@RabbitListener(queues = "xxxQueueHandle")
public class XxxQueueHandle {
private static final Logger logger = LoggerFactory.getLogger(XxxQueueHandle.class);

private volatile boolean flag = false;

@RabbitHandler
public void onMessage(String message) {

//这里防止会收到多次消息,其实不加应该也可以
if (!flag){
synchronized (XxxQueue.class){
if (!flag){
flag=true;
//这里循环去队列里面取
while (true){
try {
//take()方法是阻塞的,当队列中没有元素时会阻塞在这里
Serializable arg = XxxQueue.queue.take();
doSomething(arg);
} catch (InterruptedException e) {
logger.error("队列异常");
}
}
}
}
}
}

private void doSomething(Serializable arg){
// 执行业务逻辑
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: