spring boot + rabbitmq实例
2016-09-17 22:33
543 查看
准备工作:
rabbitmq按安装方式即可安装yml配置(发送方与接收方都要配置,但是发送方要有vhost的写权限, 接收方要有vhost的读权限)
spring: rabbitmq: host: rabbitmqIp port:5672 username: username password: password virtual-host: vhost(如果没有配置可用"/")
一:服务端与客户端都引用spring boot对rabbitmq的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二:服务端的关于rabbitmq的配置
1:通用性配置,amqpConfig.java
@Configuration public class AmqpConfig { @Bean RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 生产者用 * * @return */ @Bean public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) { RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate(); rabbitMessagingTemplate.setMessageConverter(jackson2Converter()); rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate); return rabbitMessagingTemplate; } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } }
2:静态数据配置,交换机与queue队列配置,如果需要可按需求再进行扩充
/** * exchange交换机配置 */ public interface RabbitmqExchange { /** * 合同exchange */ final String CONTRACT_FANOUT = "CONTRACT_FANOUT"; final String CONTRACT_TOPIC = "CONTRACT_TOPIC"; final String CONTRACT_DIRECT = "CONTRACT_DIRECT"; } /** * queue队列配置 */ public interface RabbitmqQueue { final String CONTRACE_SELF ="CONTRACT_SELF"; final String CONTRACE_TENANT ="CONTRACT_TENANT"; }
3:针对服务性rabbitmq配置,如:contract,tenant同时使用contract的交换机,但使用不同的queue队列进行信息发送
contractExchangeConfig,contract交换机配置/** * 基本使用的为topic, * 此处更多是以案例给出 * @author yjpfj1203 */ @Configuration public class ContractExchangeConfig { // /** // * 合同广播型 // * // * @param rabbitAdmin // * @return // */ // @Bean // FanoutExchange contractFanoutExchange(RabbitAdmin rabbitAdmin) { // FanoutExchange contractFanoutExchange = new FanoutExchange(RabbitmqExchange.CONTRACT_FANOUT); // rabbitAdmin.declareExchange(contractFanoutExchange); // return contractFanoutExchange; // } /** * 合同->匹配型 默认:, durable = true, autoDelete = false * * @param rabbitAdmin * @return */ @Bean TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin) { TopicExchange contractTopicExchange = new TopicExchange(RabbitmqExchange.CONTRACT_TOPIC); rabbitAdmin.declareExchange(contractTopicExchange); return contractTopicExchange; } /** * 合同直连型 * * @param rabbitAdmin * @return */ @Bean DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) { DirectExchange contractDirectExchange = new DirectExchange(RabbitmqExchange.CONTRACT_DIRECT); rabbitAdmin.declareExchange(contractDirectExchange); return contractDirectExchange; } // @Bean // Binding bindingExchangeContract(Queue queueContract, FanoutExchange exchange, RabbitAdmin rabbitAdmin) { // Binding binding = BindingBuilder.bind(queueContract).to(exchange); // rabbitAdmin.declareBinding(binding); // return binding; // } @Bean Binding bindingExchangeContract(Queue queueContract, TopicExchange exchange, RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueContract).to(exchange).with(RabbitmqQueue.CONTRACE_SELF); rabbitAdmin.declareBinding(binding); return binding; } @Bean Binding bindingExchangeContract(Queue queueContract, DirectExchange exchange, RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueContract).to(exchange).with(RabbitmqQueue.CONTRACE_SELF); rabbitAdmin.declareBinding(binding); return binding; } @Bean Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange, RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitmqQueue.CONTRACE_TENANT); rabbitAdmin.declareBinding(binding); return binding; } @Bean Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange, RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitmqQueue.CONTRACE_TENANT); rabbitAdmin.declareBinding(binding); return binding; } /** * 所有关于contract exchange的queue * * @param rabbitAdmin * @return */ @Bean Queue queueContract(RabbitAdmin rabbitAdmin) { Queue queue = new Queue(RabbitmqQueue.CONTRACE_SELF, true); rabbitAdmin.declareQueue(queue); return queue; } @Bean Queue queueTenant(RabbitAdmin rabbitAdmin) { Queue queue = new Queue(RabbitmqQueue.CONTRACE_TENANT, true); rabbitAdmin.declareQueue(queue); return queue; } }
三:实体类(消息的载体),(实体类与表态配置)要保证发送方与接收方都能引用到,可打jar包
/** * 合同消息载体 */ public class ContractRabbitmq { private Long id; private String name; private List<String> testStrList; private Date dateCreated; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public List<String> getTestStrList() { return testStrList; } public void setTestStrList(List<String> testStrList) { this.testStrList = testStrList; } public Date getDateCreated() { return dateCreated; } public void setDateCreated(Date dateCreated) { this.dateCreated = dateCreated; } } /** * tenant消息载体 */ public class TenantRabbitmq { private Long id; private String name; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
四:服务发送的service
@Service public class ContractRabbitmqService implements ConfirmCallback{ @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate; public void sendContractRabbitmqTopic(final ContractRabbitmq ContractRabbitmq) { this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_TOPIC, RabbitmqQueue.CONTRACE_SELF, ContractRabbitmq); } public void sendContractRabbitmqDirect(final ContractRabbitmq ContractRabbitmq) { this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_DIRECT, RabbitmqQueue.CONTRACE_SELF, ContractRabbitmq); } public void sendTenantRabbitmqTopic(final TenantRabbitmq tenantRabbitmq){ this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_TOPIC, RabbitmqQueue.CONTRACE_TENANT, tenantRabbitmq); } public void sendTenantRabbitmqDirect(final TenantRabbitmq tenantRabbitmq){ this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_DIRECT, RabbitmqQueue.CONTRACE_TENANT, tenantRabbitmq); } /** * 回调函数不起作用,暂时没搞清楚 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息确认成功"); } else { System.out.println("消息确认失败"); } } }
五:TestController
@RestController @RequestMapping(value = "/rabbitmq") public class RabbitMqTestController{ @Autowired private ContractRabbitmqService contractRabbitmqService; @RequestMapping(value = "contract/direct", method = RequestMethod.GET) public void contractDirect(String content) { ContractRabbitmq contract = new ContractRabbitmq(); contract.setDateCreated(new Date()); contract.setId(12L); contract.setName("liuhan"); List<String> strList = new ArrayList<>(); strList.add("liu"); strList.add("test str"); contract.setTestStrList(strList); contractRabbitmqService.sendContractRabbitmqDirect(contract); } @RequestMapping(value = "contract/topic", method = RequestMethod.GET) public void contractTopic(String content) { ContractRabbitmq contract = new ContractRabbitmq(); contract.setDateCreated(new Date()); contract.setId(12L); contract.setName("liuhan"); List<String> strList = new ArrayList<>(); strList.add("liu"); strList.add("test str"); contract.setTestStrList(strList); contractRabbitmqService.sendContractRabbitmqTopic(contract); } @RequestMapping(value = "tenant/direct", method = RequestMethod.GET) public void tenantDirect(String content) { TenantRabbitmq tenant = new TenantRabbitmq(); tenant.setId(12L); tenant.setName("liuhan"); contractRabbitmqService.sendTenantRabbitmqDirect(tenant); } @RequestMapping(value = "tenant/topic", method = RequestMethod.GET) public void tenantTopic(String content) { TenantRabbitmq tenant = new TenantRabbitmq(); tenant.setId(12L); tenant.setName("liuhan"); contractRabbitmqService.sendTenantRabbitmqTopic(tenant); } }
六:接收方配置文件
@Configuration @EnableRabbit public class ConsumerConfig implements RabbitListenerConfigurer { @Autowired ReceiverService receiverService; @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // factory.setPrefetchCount(5); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } }
七:接收方service
@Component public class ReceiverService { @RabbitListener(queues = RabbitmqQueue.CONTRACE_SELF) public void receiveContractQueue(ContractRabbitmq contract) { System.out.println("Received contract<" + new Gson().toJson(contract) + ">"); } @RabbitListener(queues = RabbitmqQueue.CONTRACE_TENANT) public void receiveTenantQueue(TenantRabbitmq tenant) { System.out.println("Received Bar<" + new Gson().toJson(tenant) + ">"); } }
8:test
localhost:9141/rabbitmq/contract/topiclocalhost:9141/rabbitmq/contract/direct
localhost:9141/rabbitmq/tenant/direct
localhost:9141/rabbitmq/tenant/topic
接收方打印的信息
Received contract<{"id":12,"name":"liuhan","testStrList":["liu","test str"],"dateCreated":"Sep 17, 2016 10:31:36 PM"}>
Received contract<{"id":12,"name":"liuhan","testStrList":["liu","test str"],"dateCreated":"Sep 17, 2016 10:31:37 PM"}>
Received Bar<{"id":12,"name":"liuhan"}>
Received Bar<{"id":12,"name":"liuhan"}>
相关文章推荐
- springboot+rabbitmq 整合实例
- Spring Boot RabbitMQ快速入门 (1)
- springboot+rabbitmq例子
- RabbitMQ springboot简单搭建
- springboot+rabbitMq整合开发实战二:模拟用户下单的过程
- springboot+rabbitMq整合开发实战一
- Spring Boot RabbitMQ快速入门 (2)
- spring-boot rabbitmq 例子
- 4 微服务实战系列 - SpringBoot RabbitMQ 实战解决项目中实践
- spring boot Rabbitmq集成,延时消息队列实现
- spring boot rabbitmq传递bean(实体类)配置
- springboot+rabbitmq整合示例程
- spring boot+RabbitMQ
- SpringBoot+rabbitMq的配置和使用Demo
- SpringBoot + RabbitMQ 使用Demo
- 集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息
- SPRING BOOT+ELASTICSEARCH+RABBITMQ
- Spring Boot RabbitMQ 集成
- 【学习笔记】spring boot + zookeeper + dubbo + rabbitMq + mysql + thymeleaf/freemarker + mybaits 代码试验
- springboot+dubbo+zookeeper+mybatis+redis+druid+rabbitmq