您的位置:首页 > 编程语言 > Java开发

spring boot + rabbitmq实例

2016-09-17 22:33 543 查看

准备工作:

rabbitmq按安装方式即可安装

yml配置(发送方与接收方都要配置,但是发送方要有vhost的写权限, 接收方要有vhost的读权限)

spring:
rabbitmq:
host: rabbitmqIp
port:5672
username: username
password: password
virtual-host: vhost(如果没有配置可用"/")


一:服务端与客户端都引用spring boot对rabbitmq的依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


二:服务端的关于rabbitmq的配置

1:通用性配置,amqpConfig.java

@Configuration
public class AmqpConfig {

@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}

/**
* 生产者用
*
* @return
*/
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
return rabbitMessagingTemplate;
}

@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}

}


2:静态数据配置,交换机与queue队列配置,如果需要可按需求再进行扩充

/**
* exchange交换机配置
*/
public interface RabbitmqExchange {
/**
* 合同exchange
*/
final String CONTRACT_FANOUT = "CONTRACT_FANOUT";
final String CONTRACT_TOPIC = "CONTRACT_TOPIC";
final String CONTRACT_DIRECT = "CONTRACT_DIRECT";
}

/**
* queue队列配置
*/
public interface RabbitmqQueue {
final String CONTRACE_SELF ="CONTRACT_SELF";
final String CONTRACE_TENANT ="CONTRACT_TENANT";
}


3:针对服务性rabbitmq配置,如:contract,tenant同时使用contract的交换机,但使用不同的queue队列进行信息发送

contractExchangeConfig,contract交换机配置

/**
* 基本使用的为topic,
* 此处更多是以案例给出
* @author yjpfj1203
*/
@Configuration
public class ContractExchangeConfig {

//	/**
//	 * 合同广播型
//	 *
//	 * @param rabbitAdmin
//	 * @return
//	 */
//	@Bean
//	FanoutExchange contractFanoutExchange(RabbitAdmin rabbitAdmin) {
//		FanoutExchange contractFanoutExchange = new FanoutExchange(RabbitmqExchange.CONTRACT_FANOUT);
//		rabbitAdmin.declareExchange(contractFanoutExchange);
//		return contractFanoutExchange;
//	}

/**
* 合同->匹配型 默认:, durable = true, autoDelete = false
*
* @param rabbitAdmin
* @return
*/
@Bean
TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin) {
TopicExchange contractTopicExchange = new TopicExchange(RabbitmqExchange.CONTRACT_TOPIC);
rabbitAdmin.declareExchange(contractTopicExchange);
return contractTopicExchange;
}

/**
* 合同直连型
*
* @param rabbitAdmin
* @return
*/
@Bean
DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
DirectExchange contractDirectExchange = new DirectExchange(RabbitmqExchange.CONTRACT_DIRECT);
rabbitAdmin.declareExchange(contractDirectExchange);
return contractDirectExchange;
}

//	@Bean
//	Binding bindingExchangeContract(Queue queueContract, FanoutExchange exchange, RabbitAdmin rabbitAdmin) {
//		Binding binding = BindingBuilder.bind(queueContract).to(exchange);
//		rabbitAdmin.declareBinding(binding);
//		return binding;
//	}

@Bean
Binding bindingExchangeContract(Queue queueContract, TopicExchange exchange, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueContract).to(exchange).with(RabbitmqQueue.CONTRACE_SELF);
rabbitAdmin.declareBinding(binding);
return binding;
}

@Bean
Binding bindingExchangeContract(Queue queueContract, DirectExchange exchange, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueContract).to(exchange).with(RabbitmqQueue.CONTRACE_SELF);
rabbitAdmin.declareBinding(binding);
return binding;
}

@Bean
Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitmqQueue.CONTRACE_TENANT);
rabbitAdmin.declareBinding(binding);
return binding;
}

@Bean
Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitmqQueue.CONTRACE_TENANT);
rabbitAdmin.declareBinding(binding);
return binding;
}

/**
* 所有关于contract exchange的queue
*
* @param rabbitAdmin
* @return
*/
@Bean
Queue queueContract(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue(RabbitmqQueue.CONTRACE_SELF, true);
rabbitAdmin.declareQueue(queue);
return queue;
}

@Bean
Queue queueTenant(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue(RabbitmqQueue.CONTRACE_TENANT, true);
rabbitAdmin.declareQueue(queue);
return queue;
}

}


三:实体类(消息的载体),(实体类与表态配置)要保证发送方与接收方都能引用到,可打jar包

/**
* 合同消息载体
*/
public class ContractRabbitmq {
private Long id;
private String name;
private List<String> testStrList;
private Date dateCreated;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<String> getTestStrList() {
return testStrList;
}
public void setTestStrList(List<String> testStrList) {
this.testStrList = testStrList;
}
public Date getDateCreated() {
return dateCreated;
}
public void setDateCreated(Date dateCreated) {
this.dateCreated = dateCreated;
}

}

/**
* tenant消息载体
*/
public class TenantRabbitmq {
private Long id;
private String name;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}


四:服务发送的service

@Service
public class ContractRabbitmqService implements ConfirmCallback{

@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;

public void sendContractRabbitmqTopic(final ContractRabbitmq ContractRabbitmq) {
this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_TOPIC, RabbitmqQueue.CONTRACE_SELF, ContractRabbitmq);
}

public void sendContractRabbitmqDirect(final ContractRabbitmq ContractRabbitmq) {
this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_DIRECT, RabbitmqQueue.CONTRACE_SELF, ContractRabbitmq);
}

public void sendTenantRabbitmqTopic(final TenantRabbitmq tenantRabbitmq){
this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_TOPIC, RabbitmqQueue.CONTRACE_TENANT, tenantRabbitmq);
}

public void sendTenantRabbitmqDirect(final TenantRabbitmq tenantRabbitmq){
this.rabbitMessagingTemplate.convertAndSend(RabbitmqExchange.CONTRACT_DIRECT, RabbitmqQueue.CONTRACE_TENANT, tenantRabbitmq);
}

/**
* 回调函数不起作用,暂时没搞清楚
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息确认成功");
} else {
System.out.println("消息确认失败");
}
}
}


五:TestController

@RestController
@RequestMapping(value = "/rabbitmq")
public class RabbitMqTestController{
@Autowired
private ContractRabbitmqService contractRabbitmqService;

@RequestMapping(value = "contract/direct", method = RequestMethod.GET)
public void contractDirect(String content) {
ContractRabbitmq contract = new ContractRabbitmq();
contract.setDateCreated(new Date());
contract.setId(12L);
contract.setName("liuhan");
List<String> strList = new ArrayList<>();
strList.add("liu");
strList.add("test str");
contract.setTestStrList(strList);
contractRabbitmqService.sendContractRabbitmqDirect(contract);
}

@RequestMapping(value = "contract/topic", method = RequestMethod.GET)
public void contractTopic(String content) {
ContractRabbitmq contract = new ContractRabbitmq();
contract.setDateCreated(new Date());
contract.setId(12L);
contract.setName("liuhan");
List<String> strList = new ArrayList<>();
strList.add("liu");
strList.add("test str");
contract.setTestStrList(strList);
contractRabbitmqService.sendContractRabbitmqTopic(contract);
}

@RequestMapping(value = "tenant/direct", method = RequestMethod.GET)
public void tenantDirect(String content) {
TenantRabbitmq tenant = new TenantRabbitmq();
tenant.setId(12L);
tenant.setName("liuhan");
contractRabbitmqService.sendTenantRabbitmqDirect(tenant);
}

@RequestMapping(value = "tenant/topic", method = RequestMethod.GET)
public void tenantTopic(String content) {
TenantRabbitmq tenant = new TenantRabbitmq();
tenant.setId(12L);
tenant.setName("liuhan");
contractRabbitmqService.sendTenantRabbitmqTopic(tenant);
}
}


六:接收方配置文件

@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {

@Autowired
ReceiverService receiverService;

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// factory.setPrefetchCount(5);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

}


七:接收方service

@Component
public class ReceiverService {

@RabbitListener(queues = RabbitmqQueue.CONTRACE_SELF)
public void receiveContractQueue(ContractRabbitmq contract) {
System.out.println("Received contract<" + new Gson().toJson(contract) + ">");
}

@RabbitListener(queues = RabbitmqQueue.CONTRACE_TENANT)
public void receiveTenantQueue(TenantRabbitmq tenant) {
System.out.println("Received Bar<" + new Gson().toJson(tenant) + ">");
}
}


8:test

localhost:9141/rabbitmq/contract/topic

localhost:9141/rabbitmq/contract/direct

localhost:9141/rabbitmq/tenant/direct

localhost:9141/rabbitmq/tenant/topic

接收方打印的信息

Received contract<{"id":12,"name":"liuhan","testStrList":["liu","test str"],"dateCreated":"Sep 17, 2016 10:31:36 PM"}>

Received contract<{"id":12,"name":"liuhan","testStrList":["liu","test str"],"dateCreated":"Sep 17, 2016 10:31:37 PM"}>

Received Bar<{"id":12,"name":"liuhan"}>

Received Bar<{"id":12,"name":"liuhan"}>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: