您的位置:首页 > 编程语言 > Java开发

springboot+RabbitMq之topic和fanout

2018-11-22 00:06 459 查看

1.创建一个springboot项目,导入如下maven坐标

[code] <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

2application.properties 配置

[code]spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3topic通配符交换器使用

    3.1生产者 

[code]package com.czxy.controller;

import com.czxy.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate;
//topic-exange交换机  ROUTING_KEY1 = "topic.message";
@GetMapping("/send1")
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEY1, context);
}
//topic-exange交换机  ROUTING_KEYx = "topic.#";
@GetMapping("/send2")
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEYx, context);
}

}

   3.2 消费者

[code]package com.czxy.controller;

import com.czxy.config.FanoutRabbitConfig;
import com.czxy.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME1)
public void consumeMessage(String message) {
System.out.println(message+"topic1");
}

@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME2)
public void consumeMessage1(String message) {
System.out.println(message+"topic2");
}

}

 3.3 配置类

[code]package com.czxy.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

public final static String QUEUE_NAME1 = "topic.message";
public final static String QUEUE_NAME2 = "topic.messages";

public final static String EXCHANGE_NAME = "TopicExchange";

public final static String ROUTING_KEY1 = "topic.message";
public final static String ROUTING_KEY2 = "topic.messages";
public final static String ROUTING_KEYx = "topic.#";
public final static String ROUTING_KEYy = "topic.*";

// 创建队列
@Bean
public Queue queue1() {
return new Queue(TopicRabbitConfig.QUEUE_NAME1);
}
@Bean
public Queue queue2() {
return new Queue(TopicRabbitConfig.QUEUE_NAME2);
}

// 创建一个 topic 类型的交换器
@Bean
public TopicExchange exchange() {
return new TopicExchange(TopicRabbitConfig.EXCHANGE_NAME);
}

// 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange)
@Bean
public Binding binding1(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with(TopicRabbitConfig.ROUTING_KEY1);
}
@Bean
public Binding binding2(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with(TopicRabbitConfig.ROUTING_KEYx);
}

}

4fanout广播模式交换器使用

 4.1生产者

[code]package com.czxy.controller;

import com.czxy.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate;

// fanout-exange交换机 不需要路由键
@GetMapping("/send")
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
}
}

  4.2 消费者

[code]package com.czxy.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

public final static String FINDOUT1 = ("fanout.a");
public final static String FINDOUT2 = ("fanout.b");
public final static String FINDOUT3 = ("fanout.c");
public final static String FindoutExange = ("fanoutExchange");

//创建队列
@Bean
public Queue AMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT1);
}

@Bean
public Queue BMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT2);
}

@Bean
public Queue CMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT3);
}

//创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutRabbitConfig.FindoutExange);
}

//为队列模式绑定交换机
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}

}

  4.3配置类

[code]package com.czxy.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

public final static String FINDOUT1 = ("fanout.a");
public final static String FINDOUT2 = ("fanout.b");
public final static String FINDOUT3 = ("fanout.c");
public final static String FindoutExange = ("fanoutExchange");

//创建队列
@Bean
public Queue AMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT1);
}

@Bean
public Queue BMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT2);
}

@Bean
public Queue CMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT3);
}

//创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutRabbitConfig.FindoutExange);
}

//为队列模式绑定交换机
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}

}

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: