SpringBoot整合RabbitMq后实现消息传输
2020-03-06 20:07
447 查看
今天通过学习mall商城的订单下单之后如果未进行付款之后,到达一定时间后将产生的进行取消。跟着学习,也跟着写个demo。记录一下。
编写之前要保证自己的电脑已经安装了erlang和rabbitMq。并且能正常启动。如果不了解前面过程,可以先点击mall商城后观看一遍,进行编写。
添加maven依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
添加yml文件
spring: rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /mall # rabbitmq的虚拟host username: mall # rabbitmq的用户名 password: mall # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true
编写枚举类,规范代码
package cn.zxw.bean.result; import lombok.Data; import lombok.Getter; /** * @author zxw * @version 1.0 * @description 消息队列 * @data: 2020/2/25 16:34 */ @Getter public enum QueueEnum { /** * 消息通知队列 */ QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"), /** * 消息通知ttl队列 */ QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl"); /** * 交换名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
编写配置类,绑定队列和交换机
package cn.zxw.config; import cn.zxw.bean.result.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author zxw * @version 1.0 * @description mq配置 * @data: 2020/2/25 17:09 */ @Configuration public class RabbitMqConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单延迟队列绑定的交换机 * @return */ @Bean DirectExchange orderTtlDirect(){ return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 * @return */ @Bean public Queue orderQueue(){ return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) //到期后转发的交换机 .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) //到期后转发的路由键 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) .build(); } /** * 将订单队列绑定到交换机 */ @Bean public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean public Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
编写消息的生产者
package cn.zxw.util; import cn.zxw.bean.result.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author zxw * @version 1.0 * @description 取消订单消息的发出者(生产者) * @data: 2020/2/25 17:41 */ @Component public class CancelOrderSender { private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; /** * 发送消息给ma * @param orderId 订单号 * @param delayTimes 超时时间 */ public void sendMessage(Long orderId, final long delayTimes) { //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); LOGGER.info("send delay message orderId:{}", orderId); } }
编写消息的消费者
package cn.zxw.util; import cn.zxw.service.OmsPortalOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author zxw * @version 1.0 * @description 取消订单的接收者(消费者) * @data: 2020/2/25 17:47 */ @Component @RabbitListener(queues = "mall.order.cancel") public class CancelOrderReceiver { private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); } }
编写订单service
package cn.zxw.service; import cn.zxw.bean.OrderParam; import cn.zxw.bean.result.CommonResult; import org.springframework.transaction.annotation.Transactional; /** * @author zxw * @version 1.0 * @description 前台订单处理的service * @data: 2020/2/25 17:52 */ public interface OmsPortalOrderService { /** * 根据提交信息生成订单 * @param orderParam 订单参数 */ @Transactional(rollbackFor = Exception.class) CommonResult generateOrder(OrderParam orderParam); /** * 取消单个超时订单 * @param orderId 订单号 */ @Transactional(rollbackFor = Exception.class) void cancelOrder(Long orderId); }
编写订单的测试类
package cn.zxw.service.impl; import cn.zxw.bean.OrderParam; import cn.zxw.bean.result.CommonResult; import cn.zxw.service.OmsPortalOrderService; import cn.zxw.util.CancelOrderSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author zxw * @version 1.0 * @description 前台订单服务的实现类 * @data: 2020/2/25 17:55 */ @Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender; @Override public CommonResult generateOrder(OrderParam orderParam) { //todo 执行一系类下单操作 LOGGER.info("process generateOrder"); //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成) sendDelayMessageCancelOrder(11L); return CommonResult.success(null, "下单成功"); } @Override public void cancelOrder(Long orderId) { //todo 执行一系类取消订单操作 LOGGER.info("process cancelOrder orderId:{}",orderId); } private void sendDelayMessageCancelOrder(Long orderId) { //获取订单超时时间,假设为60分钟 long delayTimes = 30 * 1000; //发送延迟消息 cancelOrderSender.sendMessage(orderId, delayTimes); } }
编写订单controller
package cn.zxw.controller; import cn.zxw.bean.OrderParam; import cn.zxw.bean.result.CommonResult; import cn.zxw.service.OmsPortalOrderService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; /** * @author zxw * @version 1.0 * @description 订单controller * @data: 2020/2/25 18:00 */ @RestController @Api(tags = "OmsPortalOrderController", description = "订单管理") @RequestMapping("/order") public class OmsPortalOrderController { @Autowired private OmsPortalOrderService portalOrderService; @ApiOperation("根据购物车信息生成订单") @RequestMapping(value = "/generateOrder", method = RequestMethod.POST) public CommonResult generateOrder(@RequestBody OrderParam orderParam) { return portalOrderService.generateOrder(orderParam); } }
大致就是这样,可以起项目进行测试啦!
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- 实现简单秒杀抢购,使用SpringBoot整合Spring-data-redis 、 rabbitMQ消息队列、redis缓存
- SpringBoot整合RabbitMQ实现微服务间的异步消息沟通
- SpringBoot整合RabbitMQ 实现五种消息模型 详细教程
- springboot整合websocket实现一对一消息推送和广播消息推送
- springboot干货——(十三【二】)整合redis实现消息队列
- springboot整合rabbitmq,支持消息确认机制
- SpringBoot整合WebSocket实现实时消息群发功能
- springboot整合websocket实现一对一消息推送和广播消息推送
- (转)RabbitMQ学习之spring整合发送异步消息(注解实现)
- SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)
- spring boot整合activeMQ,实现ptp和topic两者消息模式
- 集群与负载均衡系列(5)——消息队列之spring-boot整合Rabbitmq
- SpringBoot | 第三十八章:基于RabbitMQ实现消息延迟队列方案
- 7.SpringBoot整合RabbitMQ实现微服务间的异步消息沟通
- SpringBoot整合RabbitMQ消息中间件(菜鸟学习)
- RabbitMQ模式,RabbitMQ和springboot整合,RabbitMQ全链路消息不丢失解决
- SpringBoot2.X 整合RedisTemplate 简单实现消息队列
- spring boot整合spring-kafka实现发送接收消息实例代码
- RabbitMQ入门-10-2(整合spring-发送同步消息注解实现)
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅