您的位置:首页 > 编程语言 > Java开发

SpringBoot消息RabbitMQ的初步使用

2019-05-05 23:37 489 查看

安装RabbitMQ

使用docker安装RabbitMQ

docker pull rabbitmq:3.7.7-management

然后用docker images查看

docker run -d  -p 5672:5672 -p 15672:15672   --name 这里随便起个名字

然后就直接
http://Server-IP:15672
Server-IP是机子的ip地址,本地就可以直接使用localhost

springboot中使用RabbitMQ

想要知道在SpringBoot中如何使用RabbitMQ,那么首先要看的肯定是自动配置给我们配置了什么
找到RabbitAutoConfiguration这个类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.boot.autoconfigure.amqp;

import com.rabbitmq.client.Channel;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Ssl;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Template;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Connection;
import org.springframework.boot.autoconfigure.amqp.RabbitRetryTemplateCustomizer.Target;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
public RabbitAutoConfiguration() {
}

@Configuration
@ConditionalOnClass({RabbitMessagingTemplate.class})
@ConditionalOnMissingBean({RabbitMessagingTemplate.class})
@Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class})
protected static class MessagingTemplateConfiguration {
protected MessagingTemplateConfiguration() {
}

@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate)
20000
;
}
}

@Configuration
@Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class})
protected static class RabbitTemplateConfiguration {
private final RabbitProperties properties;
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;

public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
this.properties = properties;
this.messageConverter = messageConverter;
this.retryTemplateCustomizers = retryTemplateCustomizers;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}

template.setMandatory(this.determineMandatoryFlag());
Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER));
}

properties.getClass();
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
properties.getClass();
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
properties.getClass();
map.from(properties::getExchange).to(template::setExchange);
properties.getClass();
map.from(properties::getRoutingKey).to(template::setRoutingKey);
properties.getClass();
map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return template;
}

private boolean determineMandatoryFlag() {
Boolean mandatory = this.properties.getTemplate().getMandatory();
return mandatory != null ? mandatory : this.properties.isPublisherReturns();
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(
prefix = "spring.rabbitmq",
name = {"dynamic"},
matchIfMissing = true
)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}

@Configuration
@ConditionalOnMissingBean({ConnectionFactory.class})
protected static class RabbitConnectionFactoryCreator {
protected RabbitConnectionFactoryCreator() {
}

@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());
properties.getClass();
map.from(properties::determineAddresses).to(factory::setAddresses);
properties.getClass();
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
properties.getClass();
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
channel.getClass();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
channel.getClass();
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);
Connection connection = properties.getCache().getConnection();
connection.getClass();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
connection.getClass();
map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
connectionNameStrategy.getClass();
map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
return factory;
}

private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
properties.getClass();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
properties.getClass();
map.from(properties::determinePort).to(factory::setPort);
properties.getClass();
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
properties.getClass();
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
properties.getClass();
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
properties.getClass();
map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
Ssl ssl = properties.getSsl();
if (ssl.isEnabled()) {
factory.setUseSSL(true);
ssl.getClass();
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
ssl.getClass();
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
ssl.getClass();
map.from(ssl::getKeyStore).to(factory::setKeyStore);
ssl.getClass();
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
ssl.getClass();
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
ssl.getClass();
map.from(ssl::getTrustStore).to(factory::setTrustStore);
ssl.getClass();
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
ssl.getClass();
map.from(ssl::isValidateServerCertificate).to((validate) -> {
factory.setSkipServerCertificateValidation(!validate);
});
ssl.getClass();
map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);
}

properties.getClass();
map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();
return factory;
}
}
}

其中重要的是RabbitTemplate和AmqpAdmin
前者就是我们用来进行消息发送和获得的
而后者就是用来创建exchange queue binding

所以我们可以直接注入使用

@Autowired
RabbitTemplate rabbitTemplate;
/**
* 1.单播(点对点)
*/
@Test
public void contextLoads() {
//message需要自己构造一个;定制消息体内容和消息头
//rabbitTemplate.send(exchange,routeKey,message);

//Object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq,
//rabbitTemplate.convertAndSend(exchage,routeKey,Object);

Map<String,Object> map = new HashMap<>();
map.put("msg","这是第一个消息");
map.put("data", Arrays.asList("helloworld",123,true));
//对象被默认序列化以后发送出去
rabbitTemplate.convertAndSend("exchange.direct","atschool.news",map));
}

但是这样发送过去的消息不是json格式

MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}

通过这两行可以看出来,如果我们需要把发送的数据变成json数据格式的
就需要写一个MessageConvertor

@Configuration
public class MyAMQPConfig {

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

这样就可以了
另一个AmqpAdmin的用法和上一个一样,只是方法不用

@Autowired
AmqpAdmin amqpAdmin;
@Test
public void creatExchange(){

amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("创建完成");

amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));

创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));

}

我们使用消息队列最重要的当然是要进行自动取出消息在service层中
所以就需要注解来完成这一个功能

package com.xa.springboot02amqp.service;

import com.xa.springboot02amqp.bean.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {

@RabbitListener(queues = "atshool.news")
public void receive(Book book){
System.out.println("收到消息"+book);
}

@RabbitListener(queues = "atshool")
public void receiver02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}

}

这里 最重要的一步就是一定要在SpringBoot启动类中定义@EnableRabbit 来开启基于注解的RabbitMQ模式
上面的Book是我自定义的一个bean和map的性质是一样的都是可以进行接收和发送
注解中的queue就是从哪个消息队列中取

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: