您的位置:首页 > 编程语言 > Java开发

RabbitMQ学习(五)之主题topic(java)

2016-07-15 15:35 459 查看
转载来自 http://blog.csdn.net/zhu_tianwei/article/details/40887775
direct类型的消息通过绑定键转发到队列,但是存在一些局限性:它不能够基于多重条件进行路由选择,有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅,这就需要主题类型的转发器来实现。

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。

绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。

*可以匹配一个标识符。

#可以匹配0个或多个标识符。



1.发送日志消息SendLogTopic,发送4个消息绑定不同的绑定键, "kernal.info", "cron.warning",  "auth.info", "kernel.critical" 

[java]
view plain
copy
print?





package cn.slimsmart.rabbitmq.demo.topic;  
  
import java.util.UUID;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
//发送消息端  
public class SendLogTopic {  
    private static final String EXCHANGE_NAME = "topic_logs";  
    public static void main(String[] args) throws Exception {  
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost("192.168.101.174");  
    factory.setUsername("admin");  
        factory.setPassword("admin");  
        factory.setPort(AMQP.PROTOCOL.PORT);  
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
        // 声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
        //定义绑定键     
        String[] routing_keys = new String[] { "kernal.info", "cron.warning",    
                "auth.info", "kernel.critical" };    
        for (String routing_key : routing_keys)    
        {     
            //发送4条不同绑定键的消息  
            String msg = UUID.randomUUID().toString();    
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg    
                    .getBytes());    
            System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");    
        }    
    
        channel.close();    
        connection.close();    
    }  
  
}  

2.定义接收kernel.*消息的消费者

[java]
view plain
copy
print?





package cn.slimsmart.rabbitmq.demo.topic;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
//接收kernel.*消息  
public class ReceiveLogsTopicForKernel {  
    private static final String EXCHANGE_NAME = "topic_logs";    
    public static void main(String[] args) throws Exception {  
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost("192.168.101.174");  
    factory.setUsername("admin");  
       factory.setPassword("admin");  
    factory.setPort(AMQP.PROTOCOL.PORT);  
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
        // 声明转发器    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
        // 随机生成一个队列    
        String queueName = channel.queueDeclare().getQueue();    
            
        //接收所有与kernel相关的消息    
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");    
    
        System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");    
    
        QueueingConsumer consumer = new QueueingConsumer(channel);    
        channel.basicConsume(queueName, true, consumer);    
    
        while (true)    
        {    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
            String message = new String(delivery.getBody());    
            String routingKey = delivery.getEnvelope().getRoutingKey();    
    
            System.out.println(" [x] Received routingKey = " + routingKey    
                    + ",msg = " + message + ".");    
        }    
    }  
}  

3.接收*.critical消息消费者

[java]
view plain
copy
print?





package cn.slimsmart.rabbitmq.demo.topic;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
//接收*.critical消息  
public class ReceiveLogsTopicForCritical {  
      
     private static final String EXCHANGE_NAME = "topic_logs";    
  
    public static void main(String[] args) throws Exception {  
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost("192.168.101.174");  
    factory.setUsername("admin");  
        factory.setPassword("admin");  
    factory.setPort(AMQP.PROTOCOL.PORT);  
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
        // 声明转发器    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
        // 随机生成一个队列    
        String queueName = channel.queueDeclare().getQueue();    
        // 接收所有与kernel相关的消息    
        channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");    
    
        System.out    
                .println(" [*] Waiting for critical messages. To exit press CTRL+C");    
    
        QueueingConsumer consumer = new QueueingConsumer(channel);    
        channel.basicConsume(queueName, true, consumer);    
    
        while (true)    
        {    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
            String message = new String(delivery.getBody());    
            String routingKey = delivery.getEnvelope().getRoutingKey();    
    
            System.out.println(" [x] Received routingKey = " + routingKey    
                    + ",msg = " + message + ".");    
        }    
    }  
  
}  

启动2个消费者,再启动发送4类消息生产者。观察接收到的消息,都收到对应的消息。可以看出使用topic类型的转发器,成功实现了多重条件选择的订阅。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: