spring-boot使用rabbitmq示例
2018-01-17 18:06
639 查看
POM文件声明:
Directer队列消费
Fanout队列消费
RPC队列消费
发送代码示例
application.properties 配置属性声明
<!-- spring boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Directer队列消费
package com.yryz.quanhu.demo.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @author yehao * @version 2.0 * @date 2018年1月17日 下午2:57:23 * @Description direct的队列示例 * POM文件声明配置: * <dependency> * <groupId>org.springframework.boot</groupId> * <artifactId>spring-boot-starter-amqp</artifactId> * </dependency> */ @Service public class DirectExchangeConsumer { /** * QueueBinding: exchange和queue的绑定 * Queue:队列声明 * Exchange:声明exchange * key:routing-key * @param data */ @RabbitListener(bindings = @QueueBinding( value= @Queue(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,durable="true"), exchange=@Exchange(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.DIRECT), key=AmqpConstant.DEMO_QUEUE) ) public void handleMessage(String data){ System.out.println("hello exchange mq:" + data); } }
Fanout队列消费
package com.yryz.quanhu.demo.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @author yehao * @version 2.0 * @date 2018年1月17日 下午3:03:07 * @Description 广播队列示例 * POM文件声明配置: * <dependency> * <groupId>org.springframework.boot</groupId> * <artifactId>spring-boot-starter-amqp</artifactId> * </dependency> */ @Service public class FanoutExchangeConsumer { /** * QueueBinding: exchange和queue的绑定 * Queue:队列声明 * Exchange:声明exchange * @param data */ @RabbitListener(bindings = @QueueBinding( value= @Queue(value=AmqpConstant.DEMO_FANOUT_QUEUE,durable="true"), exchange=@Exchange(value=AmqpConstant.DEMO_FANOUT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.FANOUT)) ) public void handleMessage(String data){ System.out.println("hello Fanout Message:" + data); } }
RPC队列消费
package com.yryz.quanhu.demo.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; /** * @author yehao * @version 2.0 * @date 2018年1月17日 下午4:56:29 * @Description 回复消息示例,发送并回复,该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。 * POM文件声明配置: * <dependency> * <groupId>org.springframework.boot</groupId> * <artifactId>spring-boot-starter-amqp</artifactId> * </dependency> */ @Service public class SendAndReceiveConsummer implements ReturnCallback , ConfirmCallback { /** * QueueBinding: exchange和queue的绑定 * Queue:队列声明 * Exchange:声明exchange * key:routing-key * @param data * @return */ @RabbitListener(bindings = @QueueBinding( value= @Queue(value=AmqpConstant.DEMO_RECEIVE_QUEUE,durable="true"), exchange=@Exchange(value=AmqpConstant.DEMO_DIRECT_EXCHANGE,ignoreDeclarationExceptions="true",type=ExchangeTypes.DIRECT), key=AmqpConstant.DEMO_RECEIVE_QUEUE) ) public String getMsg(String data){ System.out.println("hello SendAndReceive mq:" + data); return "yehao SendAndReceive back"; } /** * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey * @see org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback#returnedMessage(org.springframework.amqp.core.Message, int, java.lang.String, java.lang.String, java.lang.String) */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationId() + " 发送失败"); } /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * @param correlationData * @param ack * @param cause * @see org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback#confirm(org.springframework.amqp.rabbit.support.CorrelationData, boolean, java.lang.String) */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息发送成功:" + correlationData); } else { System.out.println("消息发送失败:" + cause); } } }
发送代码示例
package com.yryz.quanhu.demo.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; /** * @author yehao * @version 2.0 * @date 2018年1月17日 下午2:57:35 * @Description MQ的发送示例方法 */ public class DemoSender { /** * rabbitTemplate 可以直接注入,由spring-boot负责维护连接池对象 */ @Autowired private RabbitTemplate rabbitTemplate; /** * direct exchange 单一消息指定发送,需同时指定exchange-key和queue的routing-key */ public void directSend(){ String msg = "hello dirct demo mq"; rabbitTemplate.setExchange(AmqpConstant.DEMO_DIRECT_EXCHANGE); rabbitTemplate.setRoutingKey(AmqpConstant.DEMO_QUEUE); rabbitTemplate.convertAndSend(msg); } /** * 发送并回复,该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。 */ public void sendAndReceive(){ String msg = "hello dirct demo mq"; rabbitTemplate.setExchange(AmqpConstant.DEMO_DIRECT_EXCHANGE); rabbitTemplate.setRoutingKey(AmqpConstant.DEMO_RECEIVE_QUEUE); Object back = rabbitTemplate.convertSendAndReceive(msg); System.out.println("back msg : " + back.toString()); } /** * fanout exchange 广播,指定exchange-key即可 */ public void fanoutSend(){ String msg = "hello fanout demo mq"; rabbitTemplate.setExchange(AmqpConstant.DEMO_FANOUT_EXCHANGE); rabbitTemplate.convertAndSend(msg); } }
application.properties 配置属性声明
# Rabbitmq配置 spring.rabbitmq.host=192.168.30.32 spring.rabbitmq.port=5672 spring.rabbitmq.username=yryz spring.rabbitmq.password=123456 spring.rabbitmq.virtualHost=yryz
相关文章推荐
- Spring Boot与RabbitMQ延迟队列使用示例
- Spring Boot中使用RabbitMQ
- RabbitMQ在SpringBoot中使用的一些注意点
- Spring Boot 中使用 RabbitMQ
- Spring Boot中使用 Spring Security 构建权限系统的示例代码
- Spring Boot使用模板freemarker的示例代码
- SpringBoot | 第十二章:RabbitMQ的集成和使用
- Spring boot集成RabbitMQ的示例代码
- 使用Spring Boot创建Web应用程序的示例代码
- Springboot中使用缓存的示例代码
- springboot使用RabbitMQ教程
- Spring boot 下使用RabbitMQ报错:406
- Spring Boot中使用LDAP来统一管理用户信息的示例
- spring boot中内嵌redis的使用方法示例
- 使用Spring Boot集成FastDFS的示例代码
- springboot使用rabbitMQ的坑
- Spring Boot中使用RabbitMQ
- 使用Springboot搭建OAuth2.0 Server的方法示例
- springboot整合rabbitmq的示例代码
- Spring boot下使用RabbitMQ实例