您的位置:首页 > 其它

rabbitmq实现延时消息

2019-04-02 11:38 274 查看
rabbitmq实现延时消息
1.创建死信队列,需要延时的消息放进去。
2.时间到期后,重新路由去其他队列
3.消费这进行消息
spring 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="rabbitmq.properties"/>
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"
username="${mq.user}" password="${mq.password}"
host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" />

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory" />

<!--定义queue -->
<rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" />
<!-- 创建延迟,有消息有效期的队列 -->
<rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<!-- 队列默认消息过期时间 -->
<value type="java.lang.Long">3600000</value>
</entry>
<!-- 消息过期根据重新路由 -->
<entry key="x-dead-letter-exchange" value="${sms.exchange}"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!-- 定义direct exchange,sms_queue -->
<rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 延迟消息配置,durable=true 持久化生效 -->
<rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/>
<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}">
<rabbit:listener queues="${sms.queue}" ref="messageReceiver"/>
</rabbit:listener-container>
</beans>

一消费者
package git.yampery.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @decription MsgProducer
* <p>生产者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {

@Resource
private AmqpTemplate amqpTemplate;
@Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;
@Value("${sms.exchange}") private String SMS_EXCHANGE;
@Value("${sms.route.key}") private String SMS_ROUTE_KEY;

/**
* 延迟消息放入延迟队列中
* @param msg
* @param expiration
*/
public void publish(String msg, String expiration) {
amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {
// 设置消息属性-过期时间
message.getMessageProperties().setExpiration(expiration);
return message;
});
}

/**
* 非延迟消息放入待消费队列
* @param msg
*/
public void publish(String msg) {
amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);
}
}

二消费者
package git.yampery.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @decription MsgConsumer
* <p>消费者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
String msg;
try {
// 线程每秒消费一次
Thread.sleep(1000);
msg = new String(message.getBody(), "utf-8");
System.out.println(msg);

} catch (Exception e) {
// 这里并没有对服务异常等失败的消息做处理,直接丢弃了
// 防止因业务异常导致消息失败造成unack阻塞再队列里
// 可以选择路由到另外一个专门处理消费失败的队列
return;
}
}
}
三测试
public static void main() {
JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");
producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));
}

文章链接
https://blog.csdn.net/wizard_rp/article/details/79310206#13-%E6%B6%88%E6%81%AF%E5%BB%B6%E8%BF%9F
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: