您的位置:首页 > 其它

RabbitMQ使用场景练习:发布/订阅(三)

2016-12-22 15:27 274 查看
发布/订阅
     即实现单点发送消息,多点接收。使用fanout转发器,广播到所有它知道的队列上 

注意要点

fanout转发器中不需要routingKey,指定也无效 

创建fanout转发器:

Java代码  


channel.exchangeDeclare("fanout_logs", "fanout");  

channel.basicPublish("fanout_logs", "" , null, SerializationUtils.serialize(object));  

临时队列的创建:

Java代码  


//创建一个临时队列,返回队列名称  

String queueName=channel.queueDeclare().getQueue();  

转发器与队列的绑定:

Java代码  


//绑定临时队列和转发器logs  

hannel.queueBind(queueName, "fanout_logs", "");  

消息发送类

Java代码  


package com.demo.mq.rabbitmq.example03;  

  

import java.io.IOException;  

import java.io.Serializable;  

  

import org.apache.commons.lang3.SerializationUtils;  

  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

  

/** 

 * 发送消息类 

 * @author sheungxin 

 * 

 */  

public class Send{  

  

    /** 

     * 发送消息,fanout转发器,广播到所有在此转发器上的队列 

     * @param object 消息主体 

     * @throws IOException 

     */  

    public static void sendAToB(Serializable object) throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        channel.exchangeDeclare("fanout_logs", "fanout");  

        channel.basicPublish("fanout_logs", "" , null, SerializationUtils.serialize(object));  

        System.out.println("A Send :'"+object+"'");  

        channel.close();  

        conn.close();  

    }  

      

    public static void main(String[] args) throws Exception {  

        for(int i=0;i<4;i++){  

            sendAToB("Hello World "+i+"!");  

        }  

    }  

}  

消息接收类

Java代码  


package com.demo.mq.rabbitmq.example03;  

  

import java.io.IOException;  

import org.apache.commons.lang3.SerializationUtils;  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.AMQP;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.Consumer;  

import com.rabbitmq.client.DefaultConsumer;  

import com.rabbitmq.client.Envelope;  

  

/** 

 * 接收消息类 

 * @author sheungxin 

 * 

 */  

public class Recv {  

      

    /** 

     * 用于接收消息,创建一个临时队列,绑定在转发器fanout上 

     * @throws Exception 

     */  

    public static void recvAToB() throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        channel.exchangeDeclare("fanout_logs", "fanout");  

        //创建一个临时队列  

        String queueName=channel.queueDeclare().getQueue();  

        //绑定临时队列和转发器logs  

        channel.queueBind(queueName, "fanout_logs", "");  

        Consumer consumer=new DefaultConsumer(channel){  

            @Override  

            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  

                String mes=SerializationUtils.deserialize(body);  

                System.out.println("B Received :'"+mes+"'...");  

            }  

        };  

        //关闭自动应答机制,默认开启;这时候需要手动进行应该  

        channel.basicConsume(queueName, true, consumer);  

    }  

      

    public static void main(String[] args) throws Exception {  

        recvAToB();  

    }  

  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: