您的位置:首页 > 其它

RabbitMQ使用场景练习:Headers(六)

2016-12-22 15:30 239 查看
Headers转发器
     消息发送时可以在header中定义一些键值对,接收消息队列与headers转发器绑定时可以指定键值对,all、any两种方式(队列绑定转发器时指定的键值对与headers中存储的键值对匹配),匹配上即可接收到消息 

注意要点

headers转发器:

Java代码  


//声明headers转发器  

channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);  

发布消息时增加header头信息:

Java代码  


//定义headers存储的键值对  

Map<String, Object> headers=new HashMap<String, Object>();  

headers.put("key", "123456");  

headers.put("token", "654321");  

//把键值对放在properties  

Builder properties=new BasicProperties.Builder();  

properties.headers(headers);  

properties.deliveryMode(2);//持久化  

channel.basicPublish("header_exchange", "" , properties.build(), SerializationUtils.serialize(object));  

转发器与队列的绑定(指定header),通过键值对匹配,有any、all两种:

Java代码  


 //指定headers的匹配类型(all、any)、键值对  

Map<String, Object> headers=new HashMap<String, Object>();  

headers.put("x-match", "all");//all any(只要有一个键值对匹配即可)  

headers.put("key", "123456");  

//headers.put("token", "6543211");  

//绑定临时队列和转发器header_exchange  

channel.queueBind(queueName, "header_exchange", "", headers);  

消息发送类

Java代码  


package com.demo.mq.rabbitmq.example07;  

  

import java.io.IOException;  

import java.io.Serializable;  

import java.util.HashMap;  

import java.util.Map;  

import org.apache.commons.lang3.SerializationUtils;  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.AMQP.BasicProperties;  

import com.rabbitmq.client.AMQP.BasicProperties.Builder;  

import com.rabbitmq.client.BuiltinExchangeType;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

  

/** 

 * 发送消息类 

 * @author sheungxin 

 * 

 */  

public class Send{  

  

    /** 

     * 发送消息,HEADERS转发器,通过HEADERS中键值对匹配相应的queue 

     * @param object 消息主体 

     * @throws IOException 

     */  

    public static void sendAToB(Serializable object) throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        //声明headers转发器  

        channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);  

        //定义headers存储的键值对  

        Map<String, Object> headers=new HashMap<String, Object>();  

        headers.put("key", "123456");  

        headers.put("token", "654321");  

        //把键值对放在properties  

        Builder properties=new BasicProperties.Builder();  

        properties.headers(headers);  

        properties.deliveryMode(2);//持久化  

        channel.basicPublish("header_exchange", "" , properties.build(), SerializationUtils.serialize(object));  

        System.out.println("Send '"+object+"'");  

        channel.close();  

        conn.close();  

    }  

      

    public static void main(String[] args) throws Exception {  

        sendAToB("Hello World !");  

    }  

}  

消息接收类

Java代码  


package com.demo.mq.rabbitmq.example07;  

  

import java.io.IOException;  

import java.util.HashMap;  

import java.util.Map;  

  

import org.apache.commons.lang3.SerializationUtils;  

  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.AMQP;  

import com.rabbitmq.client.BuiltinExchangeType;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.Consumer;  

import com.rabbitmq.client.DefaultConsumer;  

import com.rabbitmq.client.Envelope;  

  

/** 

 * 接收消息类 

 * @author sheungxin 

 * 

 */  

public class Recv {  

      

    /** 

     * 用于接收消息,创建一个临时队列,绑定在转发器HEADERS上,并模糊指定键值对 

     * @param queue 

     * @throws Exception 

     */  

    public static void recvAToB() throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);  

        //创建一个临时队列  

        String queueName=channel.queueDeclare().getQueue();  

        //指定headers的匹配类型(all、any)、键值对  

        Map<String, Object> headers=new HashMap<String, Object>();  

        headers.put("x-match", "all");//all any(只要有一个键值对匹配即可)  

        headers.put("key", "123456");  

//      headers.put("token", "6543211");  

        //绑定临时队列和转发器header_exchange  

        channel.queueBind(queueName, "header_exchange", "", headers);  

        System.out.println("Received ...");  

        Consumer consumer=new DefaultConsumer(channel){  

            @Override  

            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  

                String mes=SerializationUtils.deserialize(body);  

                System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done");  

            }  

        };  

        //关闭自动应答机制,默认开启;这时候需要手动进行应该  

        channel.basicConsume(queueName, true, consumer);  

    }  

      

    public static void main(String[] args) throws Exception {  

        recvAToB();  

    }  

  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: