您的位置:首页 > 编程语言 > Java开发

SpringBoot(十二):SpringBoot整合RabbitMQ

2018-10-26 14:17 429 查看

如果对RabbitMQ不熟悉的,建议先看RabbitMQ系列教程。

一、环境准备
RabbitMQ 3.7.4
SpringBoot 1.5.10.RELEASE
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>


application.yml

spring:
  rabbitmq:
    host: 192.168.239.128
    port: 5672
#    username: test
#    password: 123456
#    virtual-host: /vhost_test
#    publisher-confirms: true

二、简单队列

 

 

RabbitMQConfiguration.java 声明队列

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    private static final String QUEUE_SIMPLE_NAME = "test_simple_queue";

    @Bean
    public Queue queue(){
        return new Queue(QUEUE_SIMPLE_NAME, false, false, false, null);
    }

}

消费者

package cn.saytime.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@RabbitListener(queues = "test_simple_queue")
@Component
public class SimpleRecv {

    @RabbitHandler
    public void process(String message) {
        System.out.println("[x] rev : " + message);
    }

}

测试生产者

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSimpleQueue() {
        String message = "Hello RabbitMQ !";
        amqpTemplate.convertAndSend("test_simple_queue", message);
        System.out.println("[x] send " + message + " ok");
    }

}

执行test:

[x] send Hello RabbitMQ ! ok
[x] rev : Hello RabbitMQ !

三、工作队列

 

 

公平分发模式在Spring-amqp中是默认的,这种情况也是日常工作中使用最为正常的,轮询模式用的较少,区别在于prefetch默认是1,如果设置为0就是轮询模式。

3.1 公平分发模式
RabbitMQConfiguration.java 声明队列

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    private static final String QUEUE_WORK_NAME = "test_work_queue";

    @Bean
    public Queue workQueue(){
        return new Queue(QUEUE_WORK_NAME, false, false, false, null);
    }

}


消费者1

package cn.saytime.listener.workfair;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[1] rev : " + message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者2

package cn.saytime.listener.workfair;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("[2] rev : " + message);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试生产者

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testWorkFairQueue(){
        for (int i = 0; i < 20; i++) {
            String message = "Hello RabbitMQ " + i;
            // 发送消息
            amqpTemplate.convertAndSend("test_workfair_queue", message);
            System.out.println(" [x] Sent '" + message + "'");
            try {
                Thread.sleep(i*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

执行test:

 [x] Sent 'Hello RabbitMQ 0'
 [x] Sent 'Hello RabbitMQ 1'
[2] rev : Hello RabbitMQ 1
[1] rev : Hello RabbitMQ 0
 [x] Sent 'Hello RabbitMQ 2'
 [x] Sent 'Hello RabbitMQ 3'
 [x] Sent 'Hello RabbitMQ 4'
 [x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
 [x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
 [x] Sent 'Hello RabbitMQ 7'
 [x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
 [x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
 [x] Se 4000 nt 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
 [x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
 [x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
 [x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
 [x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
 [x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
 [x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
 [x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
 [x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
 [x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19

公平分发模式测试正常。

3.2 轮询分发模式
修改application.yml

spring:
  rabbitmq:
    host: 192.168.239.128
    port: 5672
#    username: test
#    password: 123456
#    virtual-host: /vhost_test
#    publisher-confirms: true
     listener:
      simple:
        prefetch: 0 

 [x] Sent 'Hello RabbitMQ 0'
 [x] Sent 'Hello RabbitMQ 1'
[1] rev : Hello RabbitMQ 0
[2] rev : Hello RabbitMQ 1
 [x] Sent 'Hello RabbitMQ 2'
 [x] Sent 'Hello RabbitMQ 3'
 [x] Sent 'Hello RabbitMQ 4'
 [x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
 [x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
 [x] Sent 'Hello RabbitMQ 7'
 [x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
 [x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
 [x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
 [x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
 [x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
 [x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
 [x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
 [x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
 [x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
 [x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
 [x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
 [x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19

可以看到消费者1消费偶数,消费者2消费奇数,表示轮询分发正常。

四、订阅模式
 

 

定义交换机、队列、以及队列与交换机的绑定关系。

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    private static final String EXCHANGE_FANNOUT_NAME = "test_exchange_fanout";
    private static final String QUEUE_PS_SMS_NAME = "test_queue_fanout_sms";
    private static final String QUEUE_PS_EMAIL_NAME = "test_queue_fanout_email";

    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE_FANNOUT_NAME);
    }
    @Bean
    public Queue fanoutSmsQueue(){
        return new Queue(QUEUE_PS_SMS_NAME, false, false, false, null);
    }
    @Bean
    public Queue fanoutEmailQueue(){
        return new Queue(QUEUE_PS_EMAIL_NAME, false, false, false, null);
    }
    @Bean
    public Binding smsQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutSmsQueue){
        return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
    }
    @Bean
    public Binding emailQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutEmailQueue){
        return BindingBuilder.bind(fanoutEmailQueue).to(fanoutExchange);
    }

}

消费者1 sms

package cn.saytime.listener.ps;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * sms消费者
 */
@RabbitListener(queues = "test_queue_fanout_sms")
@Component
public class SmsRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[sms] rev : " + message);
    }

}

消费者2

package cn.saytime.listener.ps;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * email消费者
 */
@RabbitListener(queues = "test_queue_fanout_email")
@Component
public class EmailRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[email] rev : " + message);
    }

}

测试消息:

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testFanoutQueue() {
        String message = "Hello, fanout message ";
        // 发送消息
        amqpTemplate.convertAndSend("test_exchange_fanout", "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }

}

执行test结果:

 [x] Sent 'Hello, fanout message '
[sms] rev : Hello, fanout message 
[email] rev : Hello, fanout message 
五、路由模式
跟订阅模式一样,只不过Configuration里面配置的是DirectExchange,并设定路由键。

绑定关系:

BindingBuilder.bind(queue).to(exchange).with("info");

发送消息:

amqpTemplate.convertSendAndReceive(exchange, routingkey, message);

六、主题模式
跟路由模式一样,只不过路由键可以模糊匹配,配置的是TopicExchange.
 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: