您的位置:首页 > 编程语言 > Java开发

springboot(十二)整合rabbitmq

2017-03-17 18:28 567 查看
引入jar包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


1在resource下创建rabbitmq.properties

 

#是访问port不是15672,15672是api和管理界面的port
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#如果要进行消息回调,则这里必须要设置为true
spring.rabbitmq.publisherconfirms=true


2创建rabbitmq对象RabbitMq

package com.demo.model;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* Created by huguoju on 2017/3/2.
* rabbitmq配置文件
*/
@Configuration
@Getter
@Setter
@ConfigurationProperties(locations = "classpath:rabbitmq/rabbitmq.properties",prefix = "spring.rabbitmq")
public class RabbitMq{

private String addresses;
private String username;
private String password;
private Boolean publisherconfirms;
}


3生产者配置

   3.1通用性基础配置

   

package com.demo.rabbitmq.sender;

import com.demo.model.RabbitMq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

/**
* Created by huguoju on 2017/3/2.
* 创建消息生产者
*/
@Configuration
@Slf4j
public class AmqpConfig {
@Autowired
private RabbitMq rabbitMq;

/**
* 连接rabbitmq
* @return
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory=new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitMq.getAddresses());
connectionFactory.setUsername(rabbitMq.getUsername());
connectionFactory.setPassword(rabbitMq.getPassword());
/**
* 对于每一个RabbitTemplate只支持一个ReturnCallback。
* 对于返回消息,模板的mandatory属性必须被设定为true,
* 它同样要求CachingConnectionFactory的publisherReturns属性被设定为true。
* 如果客户端通过调用setReturnCallback(ReturnCallback callback)注册了RabbitTemplate.ReturnCallback,那么返回将被发送到客户端。
* 这个回调函数必须实现下列方法:
*void returnedMessage(Message message, intreplyCode, String replyText,String exchange, String routingKey);
*/
// connectionFactory.setPublisherReturns(true);
/**
* 同样一个RabbitTemplate只支持一个ConfirmCallback。
* 对于发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true。
* 如果客户端通过setConfirmCallback(ConfirmCallback callback)注册了RabbitTemplate.ConfirmCallback,那么确认消息将被发送到客户端。
* 这个回调函数必须实现以下方法:
* void confirm(CorrelationData correlationData, booleanack);
*/
connectionFactory.setPublisherConfirms(rabbitMq.getPublisherconfirms());
return connectionFactory;
}

/**
* rabbitAdmin代理类
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}

/**
* 创建rabbitTemplate 消息模板类
* prototype原型模式:每次获取Bean的时候会有一个新的实例
*  因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory());
// rabbitTemplate.setMandatory(true);//返回消息必须设置为true
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//数据转换为json存入消息队列
//  rabbitTemplate.setReplyAddress(replyQueue().getName());
//  rabbitTemplate.setReplyTimeout(100000000);
//发布确认
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//消息发送到queue时就执行
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.debug(correlationData+"//////");
if (!b){
log.debug("发送到queue失败");
throw new RuntimeException("send error " + s);
}
}
});
return rabbitTemplate;
}
}
 3.2创建exchange

  

package com.demo.rabbitmq.sender;

/**
* Created by huguoju on 2017/3/2.
* exchange交换机配置
*/
public interface RabbitMqExchange {
final String CONTRACT_FANOUT = "CONTRACT_FANOUT";
final String CONTRACT_TOPIC = "CONTRACT_TOPIC";
final String CONTRACT_DIRECT = "CONTRACT_DIRECT";
}
3.3创建queue

   

package com.demo.rabbitmq.sender;

/**
* Created by huguoju on 2017/3/2.
* 消息队列配置
*/
public interface RabbitMqQueue {
final String CONTRACE_SELF ="CONTRACT_SELF";
final String CONTRACE_TENANT ="CONTRACT_TENANT";
}


3.4针对rabbitmq服务性的配置,配置queue和交换机并绑定

package com.demo.rabbitmq.sender;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Created by huguoju on 2017/3/2.
* 交换机配置并绑定queue
*/
@Configuration
public class ContractExchangeConfig {
@Autowired
private RabbitAdmin rabbitAdmin;

/**
* 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
* @return
*/
//    @Bean
//    FanoutExchange contractFanoutExchange(){
//        FanoutExchange fanoutExchange=new FanoutExchange(RabbitMqExchange.CONTRACT_FANOUT);
//        rabbitAdmin.declareExchange(fanoutExchange);
//        return fanoutExchange;
//    }

/**
*  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
*  默认:, durable = true, autoDelete = false
* @return
*/
@Bean
TopicExchange contractTopicExchangeDurable(){
TopicExchange contractTopicExchange=new TopicExchange(RabbitMqExchange.CONTRACT_TOPIC);
rabbitAdmin.declareExchange(contractTopicExchange);
return contractTopicExchange;
}

/**
*  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
* @return
*/
@Bean
DirectExchange contractDirectExchange(){
DirectExchange contractDirectExchange=new DirectExchange(RabbitMqExchange.CONTRACT_DIRECT);
rabbitAdmin.declareExchange(contractDirectExchange);
return contractDirectExchange;
}
@Bean
Queue queueContract(){
Queue queue=new Queue(RabbitMqQueue.CONTRACE_SELF,true);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
Queue queueTenant(){
Queue queue=new Queue(RabbitMqQueue.CONTRACE_TENANT,true);
rabbitAdmin.declareQueue(queue);
return queue;
}

//    @Bean
//    Binding bindingExchangeContract(Queue queueContract,FanoutExchange exchange){
//        Binding binding=BindingBuilder.bind(queueContract).to(exchange);
//        rabbitAdmin.declareBinding(binding);
//        return binding;
//    }
@Bean
Binding bindingExchangeContract(Queue queueContract,TopicExchange exchange){
Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);
rabbitAdmin.declareBinding(binding);
return binding;
}
//    @Bean
//    Binding bindingExchangeContract(Queue queueContract,DirectExchange exchange){
//        Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);
//        rabbitAdmin.declareBinding(binding);
//        return binding;
//    }
@Bean
Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange) {
Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);
rabbitAdmin.declareBinding(binding);
return binding;
}

//    @Bean
//    Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange) {
//        Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);
//        rabbitAdmin.declareBinding(binding);
//        return binding;
//    }

}


3.5创建消息体

package com.demo.rabbitmq.sender;

import lombok.Builder;
import lombok.Data;
import lombok.Getter;

import java.util.Date;
import java.util.List;

/**
* Created by huguoju on 2017/3/3.
*不能用@Builder,因为json反编译的时候需要set方法,builder没有set方法
* 合同消息载体
*/
//@Builder
//@Getter
@Data
public class ContractRabbitMq {
private String id;
private String name;
private List<String> testList;
private Date createDate;
}

package com.demo.rabbitmq.sender;

import lombok.Builder;
import lombok.Getter;

/**
* Created by huguoju on 2017/3/3.
* tenant消息载体
*/
@Builder
@Getter
public class TenantRabbitMq {
private String id;
private String name;
}


4消费者配置,实际使用时应该和生产者不在一个项目里,这里只是演示,所有放在了一个项目里,很多公用的文件在实际开发中可以打jar用

4.1消息的监听的代理类

  

package com.demo.rabbitmq.consumer;

import com.demo.model.RabbitMq;
import com.demo.rabbitmq.sender.RabbitMqExchange;
import com.demo.rabbitmq.sender.RabbitMqQueue;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

/**
* Created by huguoju on 2017/3/3.
* 接收方配置
* 消息的监听的代理类
*/
@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
@Autowired
ReceiverService receiverService;
@Autowired
private RabbitMq rabbitMq;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory(){
DefaultMessageHandlerMethodFactory factory=new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
//    @Bean
//    public SimpleMessageListenerContainer messageContainer() {
//        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
//        container.setQueues(queueContract());
//      //  container.setExposeListenerChannel(true);
//        container.setMaxConcurrentConsumers(1);
//        container.setConcurrentConsumers(1);
//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
//        container.setMessageListener(new MessageListener() {
//
//            @Override
//            public void onMessage(Message message) {
//                byte[] body = message.getBody();
//                System.out.println("receive msg : " + new String(body));
//               // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//            }
//        });
//        return container;
//    }
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}
}
4.2消费者监听

package com.demo.rabbitmq.consumer;

import com.demo.rabbitmq.sender.ContractRabbitMq;
import com.demo.rabbitmq.sender.RabbitMqQueue;
import com.demo.rabbitmq.sender.TenantRabbitMq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* Created by huguoju on 2017/3/3.
*/
@Component
public class ReceiverService {
@RabbitListener(queues = RabbitMqQueue.CONTRACE_SELF)
@RabbitHandler
public void receiveContractQueue(ContractRabbitMq contract) {
ObjectMapper objectMapper=new ObjectMapper();
try {
System.out.println("Received contract<" + objectMapper.writeValueAsString(contract) + ">");
} catch (IOException e) {
e.printStackTrace();
}
}

@RabbitListener(queues = RabbitMqQueue.CONTRACE_TENANT)
public void receiveTenantQueue(TenantRabbitMq tenant) {
ObjectMapper objectMapper=new ObjectMapper();
try {
System.out.println("Received contract<" + objectMapper.writeValueAsString(tenant) + ">");
} catch (IOException e) {
e.printStackTrace();
}
}
}


以上就完成了。

创建测试controller

package com.demo.controller;

import com.demo.rabbitmq.sender.ContractRabbitMq;
import com.demo.service.rabbitMq.ContractRabbitmqService;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
* Created by huguoju on 2017/3/6.
*/
@RestController
@RequestMapping("rabbitmq")
@Api(value = "测试rabbitmq",tags = "测试rabbitmq")
public class RabbitMqController {

@Autowired
public ContractRabbitmqService contractRabbitmqService;
@RequestMapping(value = "contract/topic",method = {RequestMethod.POST,RequestMethod.GET})
public void contractTopic(){
ContractRabbitMq mq=new ContractRabbitMq();
mq.setId("15");
mq.setName("测试");
mq.setTestList(Lists.newArrayList("111","222"));
mq.setCreateDate(new Date());
contractRabbitmqService.sendContractRabbitmqTopic(mq);
}
}


关于springboot一些例子
https://my.oschina.net/didispace/blog/752981
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  整合rabbitmq