您的位置:首页 > 编程语言 > Java开发

spring boot 整合rabbitmq 的 Fanout Exchange广播模式

2017-12-25 14:12 302 查看
创建队列和交换机
package com.yijiupi.login.queue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 广播模式,不处理Rout key广播到所有与之绑定的队列上
 * @author ZLY
 */
@Configuration
public class RabbitMQFanoutConfig {
    
    private static final Logger LOGGER = LoggerFactory.getLogger( RabbitMQFanoutConfig.class );

    /**
     * 创建第一一个广播队列
     * @author ZLY
     * @return Queue
     */
    @Bean
    public Queue theFirstOneBroadcastQueue() {
        LOGGER.info( "创建了theFirstOneBroadcastQueue队列" );
        return new Queue( "theFirstOneBroadcastQueue" );
    }

    /**
     * 创建第二个广播队列
     * @author ZLY
     * @return Queue
     */
    @Bean
    public Queue theSecondOneBroadcastQueue() {
        LOGGER.info( "创建了theSecondOneBroadcastQueue队列" );
        return new Queue( "theSecondOneBroadcastQueue" );
    }

    /**
     *创建一个名为fanoutExchange的广播路由器
     *@author ZLY
     *@return FanoutExchange
     * */
    @Bean
    public FanoutExchange fanoutExchange() {
        LOGGER.info( "创建了fanoutExchange交换机" );
        return new FanoutExchange( "fanoutExchange" );
    }

    /**
     * 将theFirstOneBroadcastQueue队列绑定到路由fanoutExchange
     * @author ZLY
     * @return Binding
     * */
    @Bean
    public Binding theFirstOneBinding() {
        LOGGER.info( "将theFirstOneBroadcastQueue队列绑定到交换机fanoutExchange" );
        return BindingBuilder.bind( theFirstOneBroadcastQueue() ).to( fanoutExchange() );
    }

    /**
     * 将theSecondOneBroadcastQueue队列绑定到交换机fanoutExchange
     * @author ZLY
     * @return Binding
     * **/
    @Bean
    public Binding theSecondOneBinding() {
        LOGGER.info( "将theSecondOneBroadcastQueue队列绑定到交换机fanoutExchange" );
        return BindingBuilder.bind( theSecondOneBroadcastQueue() ).to( fanoutExchange() );
    }
}
消费者
/**
     * 第一个rabbitmq广播模式消费者theFirstOneBroadcastCustomer监听theFirstOneBroadcastQueue队列中的消息并且消费
     * @author ZLY
     * @param String
     * */
    @RabbitListener( queues = "theFirstOneBroadcastQueue" )
    @RabbitHandler
    public void  theFirstOneBroadcastCustomer( String receptionString ) {
        String NOW_TIME = TimeUtil.getNowTime();
        LOGGER.info( "第一个广播模式的消费者在"+NOW_TIME+"从theFirstOneBroadcastQueue取得的消息是"+receptionString );
    }
    
    /**
     * 第二个广播模式的消费者theSecondOneBroadcastCustomer监听theSecondOneBroadcastQueue队列中的消息并且消费
     * */
    @RabbitListener ( queues = "theSecondOneBroadcastQueue" )
    @RabbitHandler
    public void theSecondOneBroadcastCustomer( String receptionString ) {
        String NOW_TIME = TimeUtil.getNowTime();
        LOGGER.info( "第二个广播模式的消费者在"+NOW_TIME+"从theSecondOneBroadcastQueue取得的消息是"+receptionString );
    }

生产者

/**
     * 将controller中的数据发送到交换机fanoutExchange中
     * @author ZLY
     * @param String
     * */
    public void sendMassageToFanoutExchange( String userInput ){
        LOGGER.info( "发送给交换机fanoutExchange的消息内容是"+userInput );
        //中间是设置路由规则,由于是广播模式,这个规则会被抛弃,但是这个字段一定要写上,
        //如果不写上会造成交换机把要转发的内容当做是路由规则直接抛弃,导致消费者监听到的队列中没有数据
        amqpTemplate.convertAndSend( "fanoutExchange", "这是要被抛弃的字段", userInput );
    }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息