springboot 集成rabbitmq 常用三种交换机(生产者确认,消费者确认,延时队列,优先级队列,消息设置过期时间 等 )
2020-07-14 06:06
387 查看
在这里插入代码片 public class RabbitConst { /*** * topic普通 */ //支付交换器 public final static String PAY_EXCHANGE_TOPIC = "xa.pay"; //队列 支付成功 public final static String PAY_SUCCESS_QUEUES = "xa.pay.success"; //队列 支付成功 public final static String PAY_SUCCESS_QUEUES2 = "xa.pay.success2"; /** * topic延时队列 */ //Topic delay交换器 public final static String EXCHANGE_TOPIC = "delay.topic"; //Topic delay 队列 public final static String DELAY_QUEUES = "delay.exchange.topice"; /** *topic 优先级队列 */ //Topic 优先级交换器 public final static String PRIORITY_TOPIC = "priority.exchange"; //Topic优先级队列 public final static String PRIORITY_QUEUES = "priority.exchange.topice"; //Topic 优先级交换器 public final static String PRIORITY_TOPIC_1 = "priority.exchange1"; //Topic优先级队列 public final static String PRIORITY_QUEUES_1 = "priority.exchange1.topice1"; /** * Fanout延时交换机 */ //延时交换机 public static final String DELAY_EXCHSNGE_NAME = "delay.exchange"; //超时订单关闭队列 public static final String TIMEOUT_TRADE_QUEUE_NAME = "closeTrade"; //超时订单关闭队列 public static final String TIMEOUT_TRADE_QUEUE_NAME2 = "closeTrade2"; /** * Direct消息 */ //exchange name public static final String DEFAULT_EXCHSNGE_NAME = "order.exchange"; //支付队列 public static final String ORDER_PAY_QUEUE_NAME = "orderPay"; } import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /*********************************支付****************消费者绑定********************************************/ /** * 交换器 * @return */ @Bean TopicExchange PaySuccessExchange() { return new TopicExchange(RabbitConst.PAY_EXCHANGE_TOPIC); } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */ /** * 交换机与队列绑定 * @return */ @Bean Binding bindingEmailExchangeMessagePay() { return BindingBuilder .bind(paySuccessMessage()) .to(PaySuccessExchange()) .with("xa.pay.*"); } /** * 队列 * @return */ @Bean public Queue paySuccessMessage() { return new Queue(RabbitConst.PAY_SUCCESS_QUEUES); } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */ /** * 交换机与队列绑定 * @return */ @Bean Binding bindingEmailExchangeMessagePay2() { return BindingBuilder .bind(paySuccessMessage2()) .to(PaySuccessExchange()) .with("xa.pay.*"); } /** * 队列 * @return */ @Bean public Queue paySuccessMessage2() { return new Queue(RabbitConst.PAY_SUCCESS_QUEUES2); } /**********************************Delayed**Topic******************************************************************/ /** * * 扇型交换机 它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中 * 直连交换机 它会把消息路由到那些 BindingKey和 RoutingKey完全匹配的队列中 * 主题交换机 将路由和某个模式匹配,# 匹配一个或者多个,* 匹配一个与direct类似,但它可以通过通配符进行模糊匹配 * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * HeadersExchange :通过添加属性key-value匹配 */ /** * 交换器 * @return */ @Bean TopicExchange delayExchangTopice() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "topic"); //什么类型交换机 // durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。 // autoDelete : 当没有队列绑定到它时 是否自动删除 TopicExchange topicExchange = new TopicExchange(RabbitConst.EXCHANGE_TOPIC, true, false, args); topicExchange.setDelayed(true); return topicExchange; } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */ @Bean Binding bindingEmailExchangeMessageTopic() { return BindingBuilder .bind(mssageTopic()) .to(delayExchangTopice()) .with("delay.exchange.*"); } /** * 队列 * @return */ @Bean public Queue mssageTopic() { return new Queue(RabbitConst.DELAY_QUEUES); } /****************************************优先级队列**********************************/ /** * 交换机 * @return */ @Bean TopicExchange priorityExchangTopice() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-priority", 5); //最大是255 值越大优先级越高 建议使用1到10之间的值 // durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。 // autoDelete : 当没有队列绑定到它时 是否自动删除 TopicExchange topicExchange = new TopicExchange(RabbitConst.PRIORITY_TOPIC, true, false, args); return topicExchange; } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */ @Bean Binding bindingpriorityExchangTopice() { return BindingBuilder .bind(priorityExchangTopiceMessage()) .to(priorityExchangTopice()) .with("priority.exchange.*"); } /** * 队列 * @return */ @Bean public Queue priorityExchangTopiceMessage() { return new Queue(RabbitConst.PRIORITY_QUEUES); } /****************************************优先级队列**********************************/ /** * 交换机 * @return */ @Bean TopicExchange priorityExchangTopice1() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-priority", 7); //最大是255 值越大优先级越高 建议使用1到10之间的值 // durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。 // autoDelete : 当没有队列绑定到它时 是否自动删除 TopicExchange topicExchange = new TopicExchange(RabbitConst.PRIORITY_TOPIC_1, true, false, args); return topicExchange; } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */ @Bean Binding bindingpriorityExchangTopice1() { return BindingBuilder .bind(priorityExchangTopiceMessage1()) .to(priorityExchangTopice1()) .with("priority.exchange1.*"); } /** * 队列 * @return */ @Bean public Queue priorityExchangTopiceMessage1() { return new Queue(RabbitConst.PRIORITY_QUEUES_1); } /*********************************Delayed************延时*fanout类型交换器**********************************************************/ /** * 队列 * @return */ @Bean public Queue delayPayQueue() { return new Queue(RabbitConst.TIMEOUT_TRADE_QUEUE_NAME); } /** * 绑定延时队列与交换机 * @return */ @Bean public Binding delayPayBind() { return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); } /** * 队列 * @return */ @Bean public Queue delayPayQueue2() { return new Queue(RabbitConst.TIMEOUT_TRADE_QUEUE_NAME2); } /** * 绑定延时队列与交换机 * @return */ @Bean public Binding delayPayBind2() { return BindingBuilder.bind(delayPayQueue2()).to(delayExchange()); } /** * 定义广播模式的延时交换机 无需绑定路由 * @return */ @Bean FanoutExchange delayExchange(){ Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "fanout"); //什么类型交换机 // durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。 // autoDelete : 当没有队列绑定到它时 是否自动删除 FanoutExchange topicExchange = new FanoutExchange(RabbitConst.DELAY_EXCHSNGE_NAME, true, false, args); topicExchange.setDelayed(true); return topicExchange; } /*****************Direct消息***************************************/ /** * 队列 * @return */ @Bean public Queue orderPayQueue(){ return new Queue(RabbitConst.ORDER_PAY_QUEUE_NAME); } /** * 交换机 * @return */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(RabbitConst.DEFAULT_EXCHSNGE_NAME, true, false); } /** * 绑定普通消息队列 * @return */ @Bean public Binding orderPayBind(){ return BindingBuilder.bind(orderPayQueue()).to(defaultExchange()).with(RabbitConst.ORDER_PAY_QUEUE_NAME); } /** * 定义消息转换器 * @return */ @Bean Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } /** * 定义消息模板用于发布消息,并且设置其消息转换器 * @param connectionFactory * @return */ @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import org.springframework.amqp.core.Message; import lombok.extern.log4j.Log4j2; /** * 消息确认 * @author Administrator * */ @Log4j2 @Component public class RabbitMessageConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** *生产者生产消息mq 确认mq接收成功 true接收成功 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info(correlationData.getId()+"---RabbitCouponSend confirm---: "+ack); String id = correlationData.getId(); //等于true 成功接收 if(ack){ //发送消息mq成功 log.info("----------------confirm---------------------------"+id ); // couponMicroServiceLogService.updateCouponMicroServiceLog(Integer.parseInt(id), CouponMicroServiceLogEnumStatus.MQSUCCESS.getValue()); }else{ log.info("----------------confirm---------------------------"+id); //失败可以重新推送消息mq 做推送失败次数限制 不然死循环 // discountCouponPayService.mqNotReceiveMessage(id); } } /** * 启动消息失败返回,比如路由找不到队列时触发回调 后期可以将消息写入死信队列 * 注意: 但是我们从打印结果中看到调用了ReturnCallback回调,该回调执行说明Exchange没有找到对应的队列,但是最终消费者还是成功消费了消息 * 大胆设想就是延迟消息原理如下: * 就是我们发送延迟消息给Broker,此时消息停留在Exchange,并且无法得知此消息的routingKey,所以会一直回调ReturnCallback函数,3s之后就可以得知该消息的routingKey,消息就可以发送到队列,然后推送给消费者。 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("---消息主体 message : "+message); log.error("---消息主体 message : "+replyCode); log.error("---描述:"+replyText); log.error("---消息使用的交换器 exchange : "+exchange); log.error("---消息使用的路由键 routing : "+routingKey); } } import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; /** * 消费者 扇型交换机 * @author asus * */ @Log4j2 @Component public class RabbitmqConsumerDelayFanout { /** * 消费延时消息 * @param content * @param message * @param channel * @throws IOException */ @RabbitListener(queues = RabbitConst.TIMEOUT_TRADE_QUEUE_NAME) public void delayedProcess(String content, Message message, Channel channel) throws IOException { try { log.info("***************process************延迟队列的内容", content); //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("超时信息处理完毕"); } catch (Exception e) { log.error("处理失败*********************", e.getMessage()); // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /** * 消费延时消息2 * @param content * @param message * @param channel * @throws IOException */ @RabbitListener(queues = RabbitConst.TIMEOUT_TRADE_QUEUE_NAME2) public void delayedProcess2(String content, Message message, Channel channel) throws IOException { try { log.info("!!!!!!!!!!!!!!!process2!!!!!!!!!!!!!!!!!!!!!!!!1延迟队列的内容[{}]", content); //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!超时信息处理完毕"); } catch (Exception e) { log.error("处理失败*********************", e.getMessage()); // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } } import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; /** *消费者 主题交换机 * @author asus * * *消息者需要去重处理 */ @Log4j2 @Component public class RabbitmqConsumerDelayTopic { /** * 消费延时消息 * @param content * @param message * @param channel * @throws IOException */ @RabbitListener(queues = RabbitConst.DELAY_QUEUES) public void topicProcess(String content, Message message, Channel channel) throws IOException { try { log.info("*******topic********process************延迟队列的内容", content); //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("*******topic******超时信息处理完毕"); } catch (Exception e) { log.error("处理失败*********************", e.getMessage()); // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /** * 普通接收 * @param msg * @param channel * @param message * @throws IOException */ @RabbitListener(queues = RabbitConst.PAY_SUCCESS_QUEUES) public void messageProcess(String msg, Channel channel, Message message) throws IOException{ try { log.info("---mq--%%%%%%%%%%%%-----pay--notify--result--" + msg); // 手动确认 // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { // 消费失败,重新发送消息 //拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); //未确认重新入队 可以重试 几次 转入 死信队列 这一般情况下使用在拉取消息时使用 //第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的 //第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息 //第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); log.error("---mq--%%%%%%%%%%%%-----pay--notify--result--="+msg); log.error("---mq--%%%%%%%%%%%%-----pay--notify--result--error--"+e); } } /** * 普通接收 * @param msg * @param channel * @param message * @throws IOException */ @RabbitListener(queues = RabbitConst.PAY_SUCCESS_QUEUES2) public void messageProcess2(String msg, Channel channel, Message message) throws IOException{ try { log.info("--2-mq--%%%%%%%%%%%%-----pay--notify--result--" + msg); // 手动确认 // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { // 消费失败,重新发送消息 //拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); //未确认重新入队 可以重试 几次 转入 死信队列 这一般情况下使用在拉取消息时使用 //第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的 //第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息 //第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); log.error("--2-mq--%%%%%%%%%%%%-----pay--notify--result--="+msg); log.error("--2-mq--%%%%%%%%%%%%-----pay--notify--result--error--"+e); } } /** * 消息者优先级队列 * @param msg * @param channel * @param message * @throws IOException */ @RabbitListener(queues = RabbitConst.PRIORITY_QUEUES) public void priority1(String msg, Channel channel, Message message) throws IOException{ log.info("-----*******************************-----priority1--notify--result--" + msg); try { //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /** * 消息者优先级队列 * @param msg * @param channel * @param message * @throws IOException */ @RabbitListener(queues = RabbitConst.PRIORITY_QUEUES_1) public void priority2(String msg, Channel channel, Message message) throws IOException{ log.info("--------------------------------------------------priority2--notify--result--" + msg); try { //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /* 消费者优先级 Map <String,Object> args = new HashMap <String,Object>(); args.put(“ x-priority”,10); channel.basicConsume(“ my-queue”,false,args,消费者);*/ } import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; /** * 消费者 直连交换机 * * @author asus * */ @Log4j2 @Component public class RabbitmqConsumerDirect { /** * 消费普通消息 * * @param content * @param message * @param channel * @throws IOException */ @RabbitListener(queues = RabbitConst.ORDER_PAY_QUEUE_NAME) public void directProcess(String content, Message message, Channel channel) throws IOException { try { log.info("普通队列的内容*********************", content); //false代表消息消费完成 告诉mq把消息删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error("处理失败*********************", e.getMessage()); // true拒绝接收消息重新入队 可以重试 几次 转入 死信队列 false丢弃该消息 注意死循环 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } } import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 生产者扇型交换机 * @author asus * */ @Component public class RabbitmqPublishDelayFanout { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送延时信息 * @param msge 内容 * @param routingKey routingKey 路由key * @param delay 延时时间,秒 */ public void sendTimeoutMsg(String msge, int delay, String id){ CorrelationData correlationData = new CorrelationData(id); // 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列 rabbitTemplate.convertAndSend(RabbitConst.DELAY_EXCHSNGE_NAME, "", msge, message ->{ //持久 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(delay * 1000); // 毫秒为单位,指定此消息的延时时长 return message; },correlationData); } } import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 生产者 主题交换机 * @author asus * */ @Component public class RabbitmqPublishDelayTopic { @Autowired RabbitTemplate rabbitTemplate; /**新建mq log表 消息发送mq前先消息写人数据库 参数id是数据库主键 * 发送延时信息 * @param msge 内容 * @param routingKey routingKey 路由key * @param delay 延时时间,秒 */ public void sendTimeoutMsg(String msge , int delay, String id){ CorrelationData correlationData = new CorrelationData(id); // 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列 rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPIC, RabbitConst.DELAY_QUEUES, msge, message ->{ //持久 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(delay * 1000); // 毫秒为单位,指定此消息的延时时长 // message.getMessageProperties().setExpiration("60000"); // 设置消息的过期时间 毫秒 当消息量大造成消息积压可以设置过期时间 return message; },correlationData); } /** * 普通发送 * @param msge * @param id */ public void send(String msge, String id) { CorrelationData correlationData = new CorrelationData(id); this.rabbitTemplate.convertAndSend(RabbitConst.PAY_EXCHANGE_TOPIC, RabbitConst.PAY_SUCCESS_QUEUES, msge, correlationData); /* message ->{message.getMessageProperties().setExpiration("60000"); // 设置消息的过期时间 毫秒 当消息量大造成消息积压可以设置过期时间 return message; },*/ } /** *优先级队列 消息优先推送 * @param msge * @param id */ public void sendPriority1(String msge, String id) { CorrelationData correlationData = new CorrelationData(id); this.rabbitTemplate.convertAndSend(RabbitConst.PRIORITY_TOPIC, RabbitConst.PRIORITY_QUEUES, msge, correlationData); } /** *优先级队列 消息优先推送 * @param msge * @param id */ public void sendPriority2(String msge, String id) { CorrelationData correlationData = new CorrelationData(id); this.rabbitTemplate.convertAndSend(RabbitConst.PRIORITY_TOPIC_1, RabbitConst.PRIORITY_QUEUES_1, msge, correlationData); } /*消息优先级 * message ->{ message.getMessageProperties().setPriority(priority); return message; },*/ } import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 生产者 直连交换机 * @author asus * */ @Component public class RabbitmqPublishDirect { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送普通消息 * @param routingKey * @param msge */ public void sendMsg(String id, String msge){ CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(RabbitConst.DEFAULT_EXCHSNGE_NAME, RabbitConst.ORDER_PAY_QUEUE_NAME , msge, correlationData); } } import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.xiaoantimes.estimate.auth.client.annotation.IgnoreUserToken; import com.xiaoantimes.estimate.common.msg.BaseResponse; import com.xiaoantimes.estimate.common.util.ResBodyUtil; import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDelayFanout; import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDelayTopic; import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDirect; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @RestController @RequestMapping("/user/mq") public class RabbitMqController { @Autowired private RabbitmqPublishDelayFanout rabbitmqPublishDelay; @Autowired private RabbitmqPublishDelayTopic rabbitmqPublishDelayTopic; @Autowired private RabbitmqPublishDirect rabbitmqPublishDirect; @ApiOperation(value = "延时Topic", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/mqTopicDelayed") public BaseResponse mqTopicDelayed(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge, @ApiParam(required = true, value = "延时时间") @RequestParam Integer time, @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){ rabbitmqPublishDelayTopic.sendTimeoutMsg(msge, time, id); return ResBodyUtil.buildSuccessResBody(); } @ApiOperation(value = "普通发送Topic", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/mqTopicOrdinary") public BaseResponse ordinary(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge, @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){ rabbitmqPublishDelayTopic.send( msge,id); return ResBodyUtil.buildSuccessResBody(); } @ApiOperation(value = "优先级队列Topic", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/mqTopicPriority") public BaseResponse mqTopicPriority(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge, @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){ rabbitmqPublishDelayTopic.sendPriority1( msge, id); return ResBodyUtil.buildSuccessResBody(); } @ApiOperation(value = "优先级队列Topic", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/mqTopicPriority2") public BaseResponse mqTopicPriority2(@ApiParam(required = true, value = "发送mq内容")@RequestParam String msge, @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){ rabbitmqPublishDelayTopic.sendPriority2(msge, id); return ResBodyUtil.buildSuccessResBody(); } /*********************************Direct*****************************************************************/ @ApiOperation(value = "普通Direct", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/mqDirectOrdinary") public BaseResponse mqDirectOrdinary( @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id, @ApiParam(required = true, value = "发送mq内容") @RequestParam String msge){ rabbitmqPublishDirect.sendMsg(id, msge); return ResBodyUtil.buildSuccessResBody(); } /*******************************************Fanout***************************************************************************/ @ApiOperation(value = "延时Fanout", notes = "", tags = "RabbitMq", httpMethod = "GET") @IgnoreUserToken @GetMapping(value="/fanoutDelayed") public BaseResponse fanoutDelayed(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge, @ApiParam(required = true, value = "延时时间") @RequestParam Integer time, @ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){ rabbitmqPublishDelay.sendTimeoutMsg(msge, time, id); return ResBodyUtil.buildSuccessResBody(); } } 配置文件 spring: rabbitmq: password: guest username: guest port: 5672 addresses: 10.10.0.198 #开启发送失败返回 publisher-returns: true #开启发送确认 publisher-confirms: true listener: simple: #指定最小的消费者数量. concurrency: 2 #指定最大的消费者数量. max-concurrency: 2 #开启ack acknowledge-mode: manual #开启ack direct: acknowledge-mode: auto #支持消息的确认与返回 template: mandatory: true pom文件添加 <!-- RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot 1ff72 -starter-amqp</artifactId> </dependency>
相关文章推荐
- SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)
- rabbitmq消息队列设置过期时间和过期消息处理
- SpringBoot 集成 rabbitmq 消息队列
- 消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?
- spring boot Rabbitmq集成,延时消息队列实现
- SpringBoot 使用 redis实现 生产者/消费者模式 消息队列
- springboot - rabbitmq 生产者消息确认
- SpringBoot 集成 RabbitMQ 消息队列
- spring boot Rabbitmq集成,延时消息队列实现
- SpringBoot中如何监听两个不同源的RabbitMQ消息队列
- 消息队列RabbitMQ与Spring集成
- RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
- rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack
- spring boot 自学笔记(五) Rabbitmq集成,延时消息队列实现
- 最新SpringBoot使用RabbitMQ延时队列小白必备
- Java中设置Session过期时间(Spring Boot)
- SpringBoot的RabbitMQ消息队列: 三、第二模式"Work queues"
- SpringBoot的RabbitMQ消息队列: 四、第三模式"Publish/Subscribe"
- rabbitmq结合spring实现消息队列优先级的方法
- RabbitMQ学习(八)——做WebSocket消息代理,集成Spring Boot实现消息实时推送