您的位置:首页 > 编程语言 > Java开发

springboot 集成rabbitmq 常用三种交换机(生产者确认,消费者确认,延时队列,优先级队列,消息设置过期时间 等 )

2020-07-14 06:06 387 查看
在这里插入代码片
public class RabbitConst {
/***
* topic普通
*/

//支付交换器
public final static String PAY_EXCHANGE_TOPIC = "xa.pay";

//队列 支付成功
public final static String PAY_SUCCESS_QUEUES = "xa.pay.success";

//队列 支付成功
public final static String PAY_SUCCESS_QUEUES2 = "xa.pay.success2";

/**
* topic延时队列
*/

//Topic  delay交换器
public final static String EXCHANGE_TOPIC = "delay.topic";

//Topic  delay 队列
public final static String DELAY_QUEUES = "delay.exchange.topice";

/**
*topic 优先级队列
*/
//Topic  优先级交换器
public final static String PRIORITY_TOPIC = "priority.exchange";

//Topic优先级队列
public final static String PRIORITY_QUEUES = "priority.exchange.topice";

//Topic  优先级交换器
public final static String PRIORITY_TOPIC_1 = "priority.exchange1";

//Topic优先级队列
public final static String PRIORITY_QUEUES_1 = "priority.exchange1.topice1";

/**
* Fanout延时交换机
*/

//延时交换机
public static final String DELAY_EXCHSNGE_NAME = "delay.exchange";

//超时订单关闭队列
public static final String TIMEOUT_TRADE_QUEUE_NAME = "closeTrade";

//超时订单关闭队列
public static final String TIMEOUT_TRADE_QUEUE_NAME2 = "closeTrade2";

/**
* Direct消息
*/
//exchange name
public static final String DEFAULT_EXCHSNGE_NAME = "order.exchange";

//支付队列
public static final String ORDER_PAY_QUEUE_NAME = "orderPay";

}

import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

/*********************************支付****************消费者绑定********************************************/
/**
* 交换器
* @return
*/
@Bean
TopicExchange PaySuccessExchange() {
return new TopicExchange(RabbitConst.PAY_EXCHANGE_TOPIC);
}

/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @return
*/
/**
* 交换机与队列绑定
* @return
*/
@Bean
Binding bindingEmailExchangeMessagePay() {
return BindingBuilder
.bind(paySuccessMessage())
.to(PaySuccessExchange())
.with("xa.pay.*");
}

/**
* 队列
* @return
*/
@Bean
public Queue paySuccessMessage() {
return new Queue(RabbitConst.PAY_SUCCESS_QUEUES);
}

/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @return
*/
/**
* 交换机与队列绑定
* @return
*/
@Bean
Binding bindingEmailExchangeMessagePay2() {
return BindingBuilder
.bind(paySuccessMessage2())
.to(PaySuccessExchange())
.with("xa.pay.*");
}

/**
* 队列
* @return
*/
@Bean
public Queue paySuccessMessage2() {
return new Queue(RabbitConst.PAY_SUCCESS_QUEUES2);
}

/**********************************Delayed**Topic******************************************************************/

/**
*
* 扇型交换机   它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
* 直连交换机 它会把消息路由到那些 BindingKey和 RoutingKey完全匹配的队列中
* 主题交换机         将路由和某个模式匹配,# 匹配一个或者多个,* 匹配一个与direct类似,但它可以通过通配符进行模糊匹配
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
* HeadersExchange :通过添加属性key-value匹配
*/

/**
* 交换器
* @return
*/
@Bean
TopicExchange delayExchangTopice() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "topic");  //什么类型交换机
// durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。
// autoDelete : 当没有队列绑定到它时 是否自动删除
TopicExchange topicExchange = new TopicExchange(RabbitConst.EXCHANGE_TOPIC, true, false, args);
topicExchange.setDelayed(true);
return topicExchange;
}
/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @return
*/
@Bean
Binding bindingEmailExchangeMessageTopic() {
return BindingBuilder
.bind(mssageTopic())
.to(delayExchangTopice())
.with("delay.exchange.*");
}

/**
* 队列
* @return
*/
@Bean
public Queue mssageTopic() {
return new Queue(RabbitConst.DELAY_QUEUES);
}

/****************************************优先级队列**********************************/
/**
* 交换机
* @return
*/
@Bean
TopicExchange priorityExchangTopice() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 5);  //最大是255    值越大优先级越高      建议使用1到10之间的值
// durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。
// autoDelete : 当没有队列绑定到它时 是否自动删除
TopicExchange topicExchange = new TopicExchange(RabbitConst.PRIORITY_TOPIC, true, false, args);
return topicExchange;
}

/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @return
*/
@Bean
Binding bindingpriorityExchangTopice() {
return BindingBuilder
.bind(priorityExchangTopiceMessage())
.to(priorityExchangTopice())
.with("priority.exchange.*");
}

/**
* 队列
* @return
*/
@Bean
public Queue priorityExchangTopiceMessage() {
return new Queue(RabbitConst.PRIORITY_QUEUES);
}

/****************************************优先级队列**********************************/
/**
* 交换机
* @return
*/
@Bean
TopicExchange priorityExchangTopice1() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 7);  //最大是255    值越大优先级越高     建议使用1到10之间的值
// durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。
// autoDelete : 当没有队列绑定到它时 是否自动删除
TopicExchange topicExchange = new TopicExchange(RabbitConst.PRIORITY_TOPIC_1, true, false, args);
return topicExchange;
}

/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @return
*/
@Bean
Binding bindingpriorityExchangTopice1() {
return BindingBuilder
.bind(priorityExchangTopiceMessage1())
.to(priorityExchangTopice1())
.with("priority.exchange1.*");
}

/**
* 队列
* @return
*/
@Bean
public Queue priorityExchangTopiceMessage1() {
return new Queue(RabbitConst.PRIORITY_QUEUES_1);
}

/*********************************Delayed************延时*fanout类型交换器**********************************************************/

/**
* 队列
* @return
*/
@Bean
public Queue delayPayQueue() {
return new Queue(RabbitConst.TIMEOUT_TRADE_QUEUE_NAME);
}
/**
* 绑定延时队列与交换机
* @return
*/
@Bean
public Binding delayPayBind() {
return BindingBuilder.bind(delayPayQueue()).to(delayExchange());
}

/**
* 队列
* @return
*/
@Bean
public Queue delayPayQueue2() {
return new Queue(RabbitConst.TIMEOUT_TRADE_QUEUE_NAME2);
}

/**
* 绑定延时队列与交换机
* @return
*/
@Bean
public Binding delayPayBind2() {
return BindingBuilder.bind(delayPayQueue2()).to(delayExchange());
}

/**
*  定义广播模式的延时交换机 无需绑定路由
* @return
*/
@Bean
FanoutExchange delayExchange(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "fanout");   //什么类型交换机
// durable: 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。
// autoDelete : 当没有队列绑定到它时 是否自动删除
FanoutExchange topicExchange = new FanoutExchange(RabbitConst.DELAY_EXCHSNGE_NAME, true, false, args);
topicExchange.setDelayed(true);
return topicExchange;
}

/*****************Direct消息***************************************/
/**
* 队列
* @return
*/
@Bean
public Queue orderPayQueue(){
return new Queue(RabbitConst.ORDER_PAY_QUEUE_NAME);
}

/**
* 交换机
* @return
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(RabbitConst.DEFAULT_EXCHSNGE_NAME, true, false);
}

/**
*  绑定普通消息队列
* @return
*/
@Bean
public Binding orderPayBind(){
return BindingBuilder.bind(orderPayQueue()).to(defaultExchange()).with(RabbitConst.ORDER_PAY_QUEUE_NAME);
}

/**
*  定义消息转换器
* @return
*/
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
*  定义消息模板用于发布消息,并且设置其消息转换器
* @param connectionFactory
* @return
*/
@Bean
RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Message;
import lombok.extern.log4j.Log4j2;
/**
* 消息确认
* @author Administrator
*
*/
@Log4j2
@Component
public class RabbitMessageConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

/**
*生产者生产消息mq  确认mq接收成功 true接收成功
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(correlationData.getId()+"---RabbitCouponSend confirm---: "+ack);
String id  = correlationData.getId();
//等于true 成功接收
if(ack){
//发送消息mq成功
log.info("----------------confirm---------------------------"+id );
//	couponMicroServiceLogService.updateCouponMicroServiceLog(Integer.parseInt(id), CouponMicroServiceLogEnumStatus.MQSUCCESS.getValue());
}else{
log.info("----------------confirm---------------------------"+id);
//失败可以重新推送消息mq  做推送失败次数限制   不然死循环
//	discountCouponPayService.mqNotReceiveMessage(id);
}

}

/**
* 启动消息失败返回,比如路由找不到队列时触发回调   后期可以将消息写入死信队列
* 注意: 但是我们从打印结果中看到调用了ReturnCallback回调,该回调执行说明Exchange没有找到对应的队列,但是最终消费者还是成功消费了消息
* 大胆设想就是延迟消息原理如下:
* 就是我们发送延迟消息给Broker,此时消息停留在Exchange,并且无法得知此消息的routingKey,所以会一直回调ReturnCallback函数,3s之后就可以得知该消息的routingKey,消息就可以发送到队列,然后推送给消费者。
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("---消息主体 message : "+message);
log.error("---消息主体 message : "+replyCode);
log.error("---描述:"+replyText);
log.error("---消息使用的交换器 exchange : "+exchange);
log.error("---消息使用的路由键 routing : "+routingKey);

}
}

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
/**
* 消费者 扇型交换机
* @author asus
*
*/
@Log4j2
@Component
public class RabbitmqConsumerDelayFanout {
/**
* 消费延时消息
* @param content
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.TIMEOUT_TRADE_QUEUE_NAME)
public void delayedProcess(String content, Message message, Channel channel) throws IOException {
try {
log.info("***************process************延迟队列的内容", content);
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("超时信息处理完毕");
} catch (Exception e) {
log.error("处理失败*********************", e.getMessage());
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}

/**
* 消费延时消息2
* @param content
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.TIMEOUT_TRADE_QUEUE_NAME2)
public void delayedProcess2(String content, Message message, Channel channel) throws IOException {
try {
log.info("!!!!!!!!!!!!!!!process2!!!!!!!!!!!!!!!!!!!!!!!!1延迟队列的内容[{}]", content);
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!超时信息处理完毕");
} catch (Exception e) {
log.error("处理失败*********************", e.getMessage());
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
/**
*消费者 主题交换机
* @author asus
*
*
*消息者需要去重处理
*/
@Log4j2
@Component
public class RabbitmqConsumerDelayTopic {
/**
* 消费延时消息
* @param content
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.DELAY_QUEUES)
public void topicProcess(String content, Message message, Channel channel) throws IOException {
try {
log.info("*******topic********process************延迟队列的内容", content);
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("*******topic******超时信息处理完毕");
} catch (Exception e) {
log.error("处理失败*********************", e.getMessage());
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}

/**
* 普通接收
* @param msg
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.PAY_SUCCESS_QUEUES)
public void messageProcess(String msg, Channel channel, Message message) throws IOException{
try {
log.info("---mq--%%%%%%%%%%%%-----pay--notify--result--" + msg);
// 手动确认
// 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// 消费失败,重新发送消息
//拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//未确认重新入队    可以重试 几次  转入 死信队列   这一般情况下使用在拉取消息时使用
//第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
//第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
//第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

log.error("---mq--%%%%%%%%%%%%-----pay--notify--result--="+msg);
log.error("---mq--%%%%%%%%%%%%-----pay--notify--result--error--"+e);
}
}

/**
* 普通接收
* @param msg
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.PAY_SUCCESS_QUEUES2)
public void messageProcess2(String msg, Channel channel, Message message) throws IOException{
try {
log.info("--2-mq--%%%%%%%%%%%%-----pay--notify--result--" + msg);
// 手动确认
// 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// 消费失败,重新发送消息
//拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//未确认重新入队    可以重试 几次  转入 死信队列   这一般情况下使用在拉取消息时使用
//第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
//第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
//第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

log.error("--2-mq--%%%%%%%%%%%%-----pay--notify--result--="+msg);
log.error("--2-mq--%%%%%%%%%%%%-----pay--notify--result--error--"+e);
}
}
/**
* 消息者优先级队列
* @param msg
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.PRIORITY_QUEUES)
public void priority1(String msg, Channel channel, Message message) throws IOException{
log.info("-----*******************************-----priority1--notify--result--" + msg);
try {
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}

}

/**
* 消息者优先级队列
* @param msg
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitConst.PRIORITY_QUEUES_1)
public void priority2(String msg, Channel channel, Message message) throws IOException{
log.info("--------------------------------------------------priority2--notify--result--" + msg);
try {
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}

}

/*  消费者优先级  Map <String,Object> args = new HashMap <String,Object>();
args.put(“ x-priority”,10);
channel.basicConsume(“ my-queue”,false,args,消费者);*/
}

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;

/**
* 消费者 直连交换机
*
* @author asus
*
*/
@Log4j2
@Component
public class RabbitmqConsumerDirect {

/**
* 消费普通消息
*
* @param content
* @param message
* @param channel
* @throws IOException
*/

@RabbitListener(queues = RabbitConst.ORDER_PAY_QUEUE_NAME)
public void directProcess(String content, Message message, Channel channel) throws IOException {
try {
log.info("普通队列的内容*********************", content);
//false代表消息消费完成  告诉mq把消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("处理失败*********************", e.getMessage());
//  true拒绝接收消息重新入队     可以重试 几次  转入 死信队列     false丢弃该消息      注意死循环
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者扇型交换机
* @author asus
*
*/
@Component
public class RabbitmqPublishDelayFanout {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送延时信息
* @param msge 内容
* @param routingKey   routingKey  路由key
* @param delay   延时时间,秒
*/
public void sendTimeoutMsg(String msge, int delay, String id){
CorrelationData correlationData = new CorrelationData(id);
// 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
rabbitTemplate.convertAndSend(RabbitConst.DELAY_EXCHSNGE_NAME, "", msge, message ->{
//持久
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(delay * 1000);   // 毫秒为单位,指定此消息的延时时长
return message;
},correlationData);
}
}

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者  主题交换机
* @author asus
*
*/
@Component
public class RabbitmqPublishDelayTopic {

@Autowired
RabbitTemplate rabbitTemplate;

/**新建mq log表  消息发送mq前先消息写人数据库   参数id是数据库主键
* 发送延时信息
* @param msge 内容
* @param routingKey   routingKey  路由key
* @param delay   延时时间,秒
*/
public void sendTimeoutMsg(String msge , int delay, String id){
CorrelationData correlationData = new CorrelationData(id);
// 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPIC, RabbitConst.DELAY_QUEUES, msge, message ->{
//持久
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(delay * 1000);   // 毫秒为单位,指定此消息的延时时长
// message.getMessageProperties().setExpiration("60000"); // 设置消息的过期时间  毫秒            当消息量大造成消息积压可以设置过期时间
return message;
},correlationData);
}

/**
* 普通发送
* @param msge
* @param id
*/
public void send(String msge, String id) {
CorrelationData correlationData = new CorrelationData(id);
this.rabbitTemplate.convertAndSend(RabbitConst.PAY_EXCHANGE_TOPIC, RabbitConst.PAY_SUCCESS_QUEUES, msge, correlationData);

/*    message ->{message.getMessageProperties().setExpiration("60000"); // 设置消息的过期时间  毫秒       当消息量大造成消息积压可以设置过期时间
return message;
},*/

}

/**
*优先级队列    消息优先推送
* @param msge
* @param id
*/
public void sendPriority1(String msge, String id) {
CorrelationData correlationData = new CorrelationData(id);
this.rabbitTemplate.convertAndSend(RabbitConst.PRIORITY_TOPIC, RabbitConst.PRIORITY_QUEUES, msge, correlationData);
}

/**
*优先级队列  消息优先推送
* @param msge
* @param id
*/
public void sendPriority2(String msge, String id) {
CorrelationData correlationData = new CorrelationData(id);
this.rabbitTemplate.convertAndSend(RabbitConst.PRIORITY_TOPIC_1, RabbitConst.PRIORITY_QUEUES_1, msge, correlationData);
}

/*消息优先级
*     message ->{
message.getMessageProperties().setPriority(priority);
return message;
},*/
}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者  直连交换机
* @author asus
*
*/
@Component
public class RabbitmqPublishDirect {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送普通消息
* @param routingKey
* @param msge
*/
public void sendMsg(String id, String msge){
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend(RabbitConst.DEFAULT_EXCHSNGE_NAME, RabbitConst.ORDER_PAY_QUEUE_NAME , msge, correlationData);
}
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.xiaoantimes.estimate.auth.client.annotation.IgnoreUserToken;
import com.xiaoantimes.estimate.common.msg.BaseResponse;
import com.xiaoantimes.estimate.common.util.ResBodyUtil;
import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDelayFanout;
import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDelayTopic;
import com.xiaoantimes.estimate.usercenter.mq.RabbitmqPublishDirect;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@RestController
@RequestMapping("/user/mq")
public class RabbitMqController {
@Autowired
private RabbitmqPublishDelayFanout rabbitmqPublishDelay;

@Autowired
private RabbitmqPublishDelayTopic  rabbitmqPublishDelayTopic;

@Autowired
private RabbitmqPublishDirect rabbitmqPublishDirect;

@ApiOperation(value = "延时Topic", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/mqTopicDelayed")
public BaseResponse mqTopicDelayed(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge,
@ApiParam(required = true, value = "延时时间") @RequestParam Integer time,
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){
rabbitmqPublishDelayTopic.sendTimeoutMsg(msge, time, id);
return ResBodyUtil.buildSuccessResBody();
}

@ApiOperation(value = "普通发送Topic", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/mqTopicOrdinary")
public BaseResponse ordinary(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge,
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){
rabbitmqPublishDelayTopic.send( msge,id);
return ResBodyUtil.buildSuccessResBody();
}

@ApiOperation(value = "优先级队列Topic", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/mqTopicPriority")
public BaseResponse mqTopicPriority(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge,
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){
rabbitmqPublishDelayTopic.sendPriority1( msge, id);
return ResBodyUtil.buildSuccessResBody();
}

@ApiOperation(value = "优先级队列Topic", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/mqTopicPriority2")
public BaseResponse mqTopicPriority2(@ApiParam(required = true, value = "发送mq内容")@RequestParam String msge,
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){
rabbitmqPublishDelayTopic.sendPriority2(msge, id);
return ResBodyUtil.buildSuccessResBody();
}

/*********************************Direct*****************************************************************/

@ApiOperation(value = "普通Direct", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/mqDirectOrdinary")
public BaseResponse mqDirectOrdinary(
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id,
@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge){
rabbitmqPublishDirect.sendMsg(id, msge);
return ResBodyUtil.buildSuccessResBody();
}

/*******************************************Fanout***************************************************************************/

@ApiOperation(value = "延时Fanout", notes = "", tags = "RabbitMq", httpMethod = "GET")
@IgnoreUserToken
@GetMapping(value="/fanoutDelayed")
public BaseResponse fanoutDelayed(@ApiParam(required = true, value = "发送mq内容") @RequestParam String msge,
@ApiParam(required = true, value = "延时时间") @RequestParam Integer time,
@ApiParam(required = true, value = "消息发送mq成功或失败消息id") @RequestParam String id){
rabbitmqPublishDelay.sendTimeoutMsg(msge, time, id);
return ResBodyUtil.buildSuccessResBody();
}

}

配置文件
spring:
rabbitmq:
password: guest
username: guest
port: 5672
addresses: 10.10.0.198
#开启发送失败返回
publisher-returns: true
#开启发送确认
publisher-confirms: true
listener:
simple:
#指定最小的消费者数量.
concurrency: 2
#指定最大的消费者数量.
max-concurrency: 2
#开启ack
acknowledge-mode: manual
#开启ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true

pom文件添加
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot
1ff72
-starter-amqp</artifactId>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: