SpringBoot消息RabbitMQ的初步使用
2019-05-05 23:37
489 查看
安装RabbitMQ
使用docker安装RabbitMQ
docker pull rabbitmq:3.7.7-management
然后用docker images查看
docker run -d -p 5672:5672 -p 15672:15672 --name 这里随便起个名字
然后就直接
http://Server-IP:15672
Server-IP是机子的ip地址,本地就可以直接使用localhost
springboot中使用RabbitMQ
想要知道在SpringBoot中如何使用RabbitMQ,那么首先要看的肯定是自动配置给我们配置了什么
找到RabbitAutoConfiguration这个类
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.boot.autoconfigure.amqp; import com.rabbitmq.client.Channel; import java.time.Duration; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Ssl; import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Template; import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Connection; import org.springframework.boot.autoconfigure.amqp.RabbitRetryTemplateCustomizer.Target; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration @ConditionalOnClass({RabbitTemplate.class, Channel.class}) @EnableConfigurationProperties({RabbitProperties.class}) @Import({RabbitAnnotationDrivenConfiguration.class}) public class RabbitAutoConfiguration { public RabbitAutoConfiguration() { } @Configuration @ConditionalOnClass({RabbitMessagingTemplate.class}) @ConditionalOnMissingBean({RabbitMessagingTemplate.class}) @Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class}) protected static class MessagingTemplateConfiguration { protected MessagingTemplateConfiguration() { } @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate) 20000 ; } } @Configuration @Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class}) protected static class RabbitTemplateConfiguration { private final RabbitProperties properties; private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers; public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { this.properties = properties; this.messageConverter = messageConverter; this.retryTemplateCustomizers = retryTemplateCustomizers; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); } template.setMandatory(this.determineMandatoryFlag()); Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER)); } properties.getClass(); map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout); properties.getClass(); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); properties.getClass(); map.from(properties::getExchange).to(template::setExchange); properties.getClass(); map.from(properties::getRoutingKey).to(template::setRoutingKey); properties.getClass(); map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); return template; } private boolean determineMandatoryFlag() { Boolean mandatory = this.properties.getTemplate().getMandatory(); return mandatory != null ? mandatory : this.properties.isPublisherReturns(); } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty( prefix = "spring.rabbitmq", name = {"dynamic"}, matchIfMissing = true ) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } @Configuration @ConditionalOnMissingBean({ConnectionFactory.class}) protected static class RabbitConnectionFactoryCreator { protected RabbitConnectionFactoryCreator() { } @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject()); properties.getClass(); map.from(properties::determineAddresses).to(factory::setAddresses); properties.getClass(); map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms); properties.getClass(); map.from(properties::isPublisherReturns).to(factory::setPublisherReturns); org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel(); channel.getClass(); map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize); channel.getClass(); map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout); Connection connection = properties.getCache().getConnection(); connection.getClass(); map.from(connection::getMode).whenNonNull().to(factory::setCacheMode); connection.getClass(); map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize); connectionNameStrategy.getClass(); map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy); return factory; } private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception { PropertyMapper map = PropertyMapper.get(); RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); properties.getClass(); map.from(properties::determineHost).whenNonNull().to(factory::setHost); properties.getClass(); map.from(properties::determinePort).to(factory::setPort); properties.getClass(); map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); properties.getClass(); map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); properties.getClass(); map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); properties.getClass(); map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat); Ssl ssl = properties.getSsl(); if (ssl.isEnabled()) { factory.setUseSSL(true); ssl.getClass(); map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm); ssl.getClass(); map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType); ssl.getClass(); map.from(ssl::getKeyStore).to(factory::setKeyStore); ssl.getClass(); map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase); ssl.getClass(); map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType); ssl.getClass(); map.from(ssl::getTrustStore).to(factory::setTrustStore); ssl.getClass(); map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase); ssl.getClass(); map.from(ssl::isValidateServerCertificate).to((validate) -> { factory.setSkipServerCertificateValidation(!validate); }); ssl.getClass(); map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification); } properties.getClass(); map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout); factory.afterPropertiesSet(); return factory; } } }
其中重要的是RabbitTemplate和AmqpAdmin
前者就是我们用来进行消息发送和获得的
而后者就是用来创建exchange queue binding
所以我们可以直接注入使用
@Autowired RabbitTemplate rabbitTemplate; /** * 1.单播(点对点) */ @Test public void contextLoads() { //message需要自己构造一个;定制消息体内容和消息头 //rabbitTemplate.send(exchange,routeKey,message); //Object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq, //rabbitTemplate.convertAndSend(exchage,routeKey,Object); Map<String,Object> map = new HashMap<>(); map.put("msg","这是第一个消息"); map.put("data", Arrays.asList("helloworld",123,true)); //对象被默认序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.direct","atschool.news",map)); }
但是这样发送过去的消息不是json格式
MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); }
通过这两行可以看出来,如果我们需要把发送的数据变成json数据格式的
就需要写一个MessageConvertor
@Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
这样就可以了
另一个AmqpAdmin的用法和上一个一样,只是方法不用
@Autowired AmqpAdmin amqpAdmin; @Test public void creatExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("创建完成"); amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); 创建绑定规则 amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); }
我们使用消息队列最重要的当然是要进行自动取出消息在service层中
所以就需要注解来完成这一个功能
package com.xa.springboot02amqp.service; import com.xa.springboot02amqp.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BookService { @RabbitListener(queues = "atshool.news") public void receive(Book book){ System.out.println("收到消息"+book); } @RabbitListener(queues = "atshool") public void receiver02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
这里 最重要的一步就是一定要在SpringBoot启动类中定义@EnableRabbit 来开启基于注解的RabbitMQ模式
上面的Book是我自定义的一个bean和map的性质是一样的都是可以进行接收和发送
注解中的queue就是从哪个消息队列中取
相关文章推荐
- Spring Boot 2.0.0.M7 使用异步消息服务-AMQP(RabbitMQ)
- SpringBoot使用RabbitMQ做消息中间件
- spring boot / cloud (九) 使用rabbitmq消息中间件
- spring boot / cloud (九) 使用rabbitmq消息中间件
- SpringBoot 系列 | 第十篇:使用RabbitMQ收发消息
- Spring Boot中使用RabbitMQ
- Spring Boot中使用RabbitMQ
- Spring Boot中使用RabbitMQ(mac安装rabbitMQ和Erlang)
- 在Spring Boot框架下使用WebSocket实现消息推送
- 在Spring Boot框架下使用WebSocket实现消息推送
- Spring Boot 中使用 RabbitMQ
- 单元测试之初步使用篇(testng + jmockit + springboot)
- Spring Boot 构建应用——整合消息中间件 RabbitMQ
- RabbitMQ在SpringBoot中使用的一些注意点
- SpringBoot的RabbitMQ消息队列: 六、第五模式"Topics"
- 消息队列 RabbitMQ 与 Spring 整合使用
- Spring Boot使用Redis进行消息的发布订阅
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- Spring Boot与消息(JMS、AMQP、RabbitMQ)
- rabbitmq学习10:使用spring-amqp发送消息及异步接收消息