您的位置:首页 > 编程语言 > Java开发

Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(一)

2018-08-29 09:36 826 查看

前言:在电商系统中,可能有这样一个需求,订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。如果用常规的定时器定时去查询,这会造成很大的消耗(频繁访问数据库)。

这里选择RabbitMQ来实现类似的功能(使用队列的TTL特性)

1.这种模式大概流程,我们需要将消息先发送到
ttl
延迟队列内,当消息到达过期时间后会自动转发到
ttl
队列内配置的转发
Exchange
以及
RouteKey
绑定的队列内完成消息消费。

2.添加maven依赖

[code]<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>ncy>

 

3.yml配置

[code]spring:
#rabbitmq消息队列配置信息
rabbitmq:
host: localhost
port: 5672
username: root
password: root
#消息发送和接收确认
publisher-confirms: true
publisher-returns: true
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
server:
port: 9999

4.创建一个常量类,存放队列和交换机名字--QueueContent

[code]/**
* @Auther: cjw
* @Date: 2018/6/28 17:17
* @Description:
*/
public class QueueContent {

/**
* 普通消息通知队列名称
*/
public static final String MESSAGE_QUEUE_NAME="message.ordinary.queue";

/**
* ttl(延时)消息通知队列名称
*/
public static final String MESSAGE_TTL_QUEUE_NAME="message.ttl.queue";

/**
* 普通交换机名称
*/

public static final String DIRECT_EXCHANGE_NAME="message.ordinary.exchange";

/**
* ttl(延时)交换机名称
*/
public static final String TOPIC_EXCHANGE_NAME="message.ttl.exchange";
}

5.建立一个队列枚举类-QueueEnum

[code]@Getter
public enum QueueEnum {

/**
* 消息通知队列
*/
MESSAGE_QUEUE(QueueContent.DIRECT_EXCHANGE_NAME, QueueContent.MESSAGE_QUEUE_NAME, QueueContent.MESSAGE_QUEUE_NAME),
/**
* 消息通知ttl队列
*/
MESSAGE_TTL_QUEUE(QueueContent.TOPIC_EXCHANGE_NAME, QueueContent.MESSAGE_TTL_QUEUE_NAME, QueueContent.MESSAGE_TTL_QUEUE_NAME);
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;

QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}

public String getExchange() {
return exchange;
}

public void setExchange(String exchange) {
this.exchange = exchange;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getRouteKey() {
return routeKey;
}

public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}

}

6.建立队列配置类-RabbitMqConfiguration

[code]@Configuration
public class RabbitMqConfiguration {

/**
* 普通消息交换机配置
*
* @return
*/
@Bean
DirectExchange messageDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
.durable(true)
.build();
}

/**
* 延时消息交换机配置
*
* @return
*/
@Bean
DirectExchange messageTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
.durable(true)
.build();
}

/**
* 普通消息队列配置
*
* @return
*/
@Bean
public Queue messageQueue() {
return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
}

/**
* TTL消息队列配置
*
* @return
*/
@Bean
Queue messageTtlQueue() {
return QueueBuilder
.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
.build();
}

/**
* 普通队列和普通交换机的绑定-routekey
*
* @param messageDirect 消息中心交换配置
* @param messageQueue  消息中心队列
* @return
*/
@Bean
Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
return BindingBuilder
.bind(messageQueue)
.to(messageDirect)
.with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
}

/**
* ttl队列和ttl交换机的绑定-routekey
*
* @param messageTtlQueue
* @param messageTtlDirect
* @return
*/
@Bean
public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
return BindingBuilder
.bind(messageTtlQueue)
.to(messageTtlDirect)
.with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
}

}

上面就是队列和交换机的绑定

1.声明了普通

消息通知队列
的相关
Exchange
Queue
Binding
等配置,将message.ordinary.queue队列通过路由键message.ordinary.queue绑定到了message.ordinary.exchange交换上。

2.声明了延时

消息通知队列
的相关
Exchange
Queue
Binding
等配置,将message.ttl.queue队列通过message.ttl.queue路由键绑定到了message.ttl.exchange交换上。

7.编写消息发送---MessageProvider

[code]@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback {

static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

/**
* RabbitMQ 模版消息实现类
*/
protected RabbitTemplate rabbitTemplate;

public MessageProvider(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
}

private String msgPojoStr;

/**
* 发送延迟消息
* @param messageContent
*/
public void sendMessage(MessagePojo messageContent) {
if (messageContent != null){
//这里用于消费者消费消息的时候处理具体业务
if (StringUtils.isEmpty(messageContent.getClassName())){
logger.error("处理业务的类名不能为空");
return;
}

messageContent.setMessageId(UUID.randomUUID().toString());
messageContent.setCreateTime(DateUtil.datetoString(new Date()));
String msg = JSON.toJSONString(messageContent);
msgPojoStr = msg;
logger.info("延迟:{}秒写入消息队列:{},消息内容:{}", messageContent.getDelay(), QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(), msg);
// 执行发送消息到指定队列
CorrelationData correlationData = new CorrelationData(messageContent.getMessageId());
rabbitTemplate.convertAndSend(QueueEnum.MESSAGE_TTL_QUEUE.getExchange(), QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(), msg, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(messageContent.getDelay()*1000));
return message;
},correlationData);
}else {
logger.warn("消息内容为空!!!!!");
}

}

/**
* 发送确认
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println(" 回调id:" + correlationData);
if (b) {
System.out.println(msgPojoStr+":消息发送成功");
} else {
logger.warn(msgPojoStr+":消息发送失败:" + s);
}
}
}

 

8.消息实体--MessagePojo

[code]@Data
public class MessagePojo implements Serializable {

//定时过期时间(单位:秒)马上消费,设置为0
private int delay;

//处理类名(必填项)
private String className;

//消息参数
private Map<String, Object> params;

private String createTime;

private String messageId;

public MessagePojo() {

}

public MessagePojo(int delay, String className,Map<String, Object> params) {
this.delay = delay;
this.className = className;
this.params = params;
}

}

以上是消息的定义和发送,下一篇讲消费者的处理 https://www.geek-share.com/detail/2746506572.html

源码地址:https://gitee.com/JanXs/springboot-rabbitMQ

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: