rabbitmq实现延时消息
2019-04-02 11:38
274 查看
rabbitmq实现延时消息 1.创建死信队列,需要延时的消息放进去。 2.时间到期后,重新路由去其他队列 3.消费这进行消息 spring 配置 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="rabbitmq.properties"/> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" username="${mq.user}" password="${mq.password}" host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" /> <!--定义rabbit template用于数据的接收和发送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定义queue --> <rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" /> <!-- 创建延迟,有消息有效期的队列 --> <rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <!-- 队列默认消息过期时间 --> <value type="java.lang.Long">3600000</value> </entry> <!-- 消息过期根据重新路由 --> <entry key="x-dead-letter-exchange" value="${sms.exchange}"/> </rabbit:queue-arguments> </rabbit:queue> <!-- 定义direct exchange,sms_queue --> <rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 延迟消息配置,durable=true 持久化生效 --> <rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}"> <rabbit:listener queues="${sms.queue}" ref="messageReceiver"/> </rabbit:listener-container> </beans> 一消费者 package git.yampery.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @decription MsgProducer * <p>生产者</p> * @author Yampery * @date 2018/2/11 11:44 */ @Component public class MsgProducer { @Resource private AmqpTemplate amqpTemplate; @Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE; @Value("${sms.exchange}") private String SMS_EXCHANGE; @Value("${sms.route.key}") private String SMS_ROUTE_KEY; /** * 延迟消息放入延迟队列中 * @param msg * @param expiration */ public void publish(String msg, String expiration) { amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> { // 设置消息属性-过期时间 message.getMessageProperties().setExpiration(expiration); return message; }); } /** * 非延迟消息放入待消费队列 * @param msg */ public void publish(String msg) { amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg); } } 二消费者 package git.yampery.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * @decription MsgConsumer * <p>消费者</p> * @author Yampery * @date 2018/2/11 11:43 */ public class MsgConsumer implements MessageListener { @Override public void onMessage(Message message) { String msg; try { // 线程每秒消费一次 Thread.sleep(1000); msg = new String(message.getBody(), "utf-8"); System.out.println(msg); } catch (Exception e) { // 这里并没有对服务异常等失败的消息做处理,直接丢弃了 // 防止因业务异常导致消息失败造成unack阻塞再队列里 // 可以选择路由到另外一个专门处理消费失败的队列 return; } } } 三测试 public static void main() { JSONObject jObj = new JSONObject(); jObj.put("msg", "这是一条短信"); producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000)); } 文章链接 https://blog.csdn.net/wizard_rp/article/details/79310206#13-%E6%B6%88%E6%81%AF%E5%BB%B6%E8%BF%9F
相关文章推荐
- spring boot 自学笔记(五) Rabbitmq集成,延时消息队列实现
- RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案二
- 用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收
- 基于Rabbitmq实现延迟消息
- RabbitMQ 发布订阅-实现延时重试队列(参考)
- 使用Kotlin+RocketMQ实现延时消息的示例代码
- RabbitMQ 延时消息设计
- 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现
- java实现rabbitmq消息的发送接受
- RabbitMQ 实现延时队列
- 为什么Redis的消息机制不适合实现延时队列?
- Springcloud学习——结合RabbitMQ实现消息总线更新分布式配置
- SpringBoot整合RabbitMQ 实现五种消息模型 详细教程
- .NetCore Cap 结合 RabbitMQ 实现消息订阅
- 基于PHP使用rabbitmq实现消息队列
- RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)
- (转) RabbitMQ学习之spring整合发送同步消息(注解实现)
- Python RabbitMQ消息队列实现rpc
- [RabbitMQ]15_RabbitMQ学习之spring整合发送同步消息(注解实现)