您的位置:首页 > 编程语言 > Java开发

spring-boot使用rabbitmq示例

2018-01-17 18:06 639 查看
POM文件声明:

<!-- spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


Directer队列消费

package com.yryz.quanhu.demo.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* @author yehao
* @version 2.0
* @date 2018年1月17日 下午2:57:23
* @Description direct的队列示例
* POM文件声明配置:
* <dependency>
*	<groupId>org.springframework.boot</groupId>
*	<artifactId>spring-boot-starter-amqp</artifactId>
* </dependency>
*/
@Service
public class DirectExchangeConsumer {

/**
* QueueBinding: exchange和queue的绑定
* Queue:队列声明
* Exchange:声明exchange
* key:routing-key
* @param data
*/
@RabbitListener(bindings = @QueueBinding(
value= @Queue(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,durable="true"),
exchange=@Exchange(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.DIRECT),
key=AmqpConstant.DEMO_QUEUE)
)
public void handleMessage(String data){
System.out.println("hello exchange mq:" + data);
}

}

Fanout队列消费

package com.yryz.quanhu.demo.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* @author yehao
* @version 2.0
* @date 2018年1月17日 下午3:03:07
* @Description 广播队列示例
* POM文件声明配置:
* <dependency>
*	<groupId>org.springframework.boot</groupId>
*	<artifactId>spring-boot-starter-amqp</artifactId>
* </dependency>
*/
@Service
public class FanoutExchangeConsumer {

/**
* QueueBinding: exchange和queue的绑定
* Queue:队列声明
* Exchange:声明exchange
* @param data
*/
@RabbitListener(bindings = @QueueBinding(
value= @Queue(value=AmqpConstant.DEMO_FANOUT_QUEUE,durable="true"),
exchange=@Exchange(value=AmqpConstant.DEMO_FANOUT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.FANOUT))
)
public void handleMessage(String data){
System.out.println("hello Fanout Message:" + data);
}

}

RPC队列消费

package com.yryz.quanhu.demo.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

/**
* @author yehao
* @version 2.0
* @date 2018年1月17日 下午4:56:29
* @Description 回复消息示例,发送并回复,该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。
* POM文件声明配置:
* <dependency>
*	<groupId>org.springframework.boot</groupId>
*	<artifactId>spring-boot-starter-amqp</artifactId>
* </dependency>
*/
@Service
public class SendAndReceiveConsummer implements ReturnCallback , ConfirmCallback {

/**
* QueueBinding: exchange和queue的绑定
* Queue:队列声明
* Exchange:声明exchange
* key:routing-key
* @param data
* @return
*/
@RabbitListener(bindings = @QueueBinding(
value= @Queue(value=AmqpConstant.DEMO_RECEIVE_QUEUE,durable="true"),
exchange=@Exchange(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.DIRECT),
key=AmqpConstant.DEMO_RECEIVE_QUEUE)
)
public String getMsg(String data){
System.out.println("hello SendAndReceive mq:" + data);
return "yehao SendAndReceive back";
}

/**
* ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
* @see org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback#returnedMessage(org.springframework.amqp.core.Message, int, java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationId() + " 发送失败");

}

/**
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
* @param correlationData
* @param ack
* @param cause
* @see org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback#confirm(org.springframework.amqp.rabbit.support.CorrelationData, boolean, java.lang.String)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}

}

发送代码示例

package com.yryz.quanhu.demo.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/**
* @author yehao
* @version 2.0
* @date 2018年1月17日 下午2:57:35
* @Description MQ的发送示例方法
*/
public class DemoSender {

/**
* rabbitTemplate 可以直接注入,由spring-boot负责维护连接池对象
*/
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* direct exchange 单一消息指定发送,需同时指定exchange-key和queue的routing-key
*/
public void directSend(){
String msg = "hello dirct demo mq";
rabbitTemplate.setExchange(AmqpConstant.DEMO_DIRECT_EXCHANGE);
rabbitTemplate.setRoutingKey(AmqpConstant.DEMO_QUEUE);
rabbitTemplate.convertAndSend(msg);
}

/**
* 发送并回复,该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。
*/
public void sendAndReceive(){
String msg = "hello dirct demo mq";
rabbitTemplate.setExchange(AmqpConstant.DEMO_DIRECT_EXCHANGE);
rabbitTemplate.setRoutingKey(AmqpConstant.DEMO_RECEIVE_QUEUE);
Object back = rabbitTemplate.convertSendAndReceive(msg);
System.out.println("back msg : " + back.toString());
}

/**
* fanout exchange 广播,指定exchange-key即可
*/
public void fanoutSend(){
String msg = "hello fanout demo mq";
rabbitTemplate.setExchange(AmqpConstant.DEMO_FANOUT_EXCHANGE);
rabbitTemplate.convertAndSend(msg);
}

}

application.properties 配置属性声明

# Rabbitmq配置
spring.rabbitmq.host=192.168.30.32
spring.rabbitmq.port=5672
spring.rabbitmq.username=yryz
spring.rabbitmq.password=123456
spring.rabbitmq.virtualHost=yryz
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: