您的位置:首页 > 编程语言 > Java开发

RabbitMQ Java官方教程(四)----Routing

2018-01-24 17:17 197 查看

RabbitMQ Java官方教程(四)----Routing

Routing(定位发送)

在前面的教学中我们创建了一个简单的日志系统。我们可以广播日志消息给很多个接收者。

在本次教程中,我们将给它添加一个新的特性——我们将使它可以只订阅消息的一部分,而不是全部接收。例如,我们将只是把关键的错误消息直接发送到日志文件(以节省磁盘空间),同时我们还能够自啊控制台上打印出所有消息。

 

Bindings(绑定)

在之前的例子中我们已经创建了一个绑定关系。代码是这样的:

channel.queueBind(queueName, EXCHANGE_NAME,
"");
所谓绑定,是建立交换器和队列之间的关系。这个绑定关系可以简单的理解为:这个队列对来自这个交换器的消息感兴趣。

绑定过程中可以使用一个routingKey(个人理解为特定的路径表示)参数。避免与basic_publish参数混淆,我们将它称为绑定键。下面是我们如何用键创建绑定关系:

channel.queueBind(queueName, EXCHANGE_NAME,
"black");
绑定键的意义依赖于交换器的类型,像之前用的fanout交换器就会直接忽略这个值。

 

指向交换器

我们之前教学中的日志系统是广播所有的消息给所有的消费者。我们现在要扩展这个功能,允许程序按照轻重缓急过滤消息。就比如说,我们想要一个程序,它将只把严重性的错误日志消息写入磁盘,而不在警告或普通日志信息上浪费磁盘空间。

我们之前用的fanout交换器,它的灵活性太差了。它只是无脑的广播消息。

我们将用direct类型的交换器代替它。direct交换器的路径匹配规则很简单——就是将消息传递到绑定键和routingKey完全匹配的队列。

下面是一个图示:

                        


在这个设置中,我们看到direct交换器绑定了两个队列。第一个队列是用orange最为绑定键,而第二个队列有连个绑定关系,一个键是black还有一个green。

在这个设置里,一个推送给routing key为orange的交换器的消息将被定向发送给队列Q1.routing key是black或者green的将被定向发送给Q2。剩下的所有消息都将直接被舍弃。

 

多重绑定

                          


 

用相同的绑定键绑定多个队列是完全可以的。在这个例子中,我们在交换器X和Q1之间添加black绑定键。在这种情况中,direct交换器就类似于之前的fanout交换器了,相当于将消息广播到了所有的匹配队列。带有routingkey为black的消息消息被发送到Q1和Q2。

 

发送日志

我们将使用这种模式到我们的日志系统中。用direct交换器代替之前的fanout类型交换器。我们将提供一个状态来作为routing
key(路由密钥)。这样,接收程序就能够选择它想要接收的严重程度。让我们首先看一下发送日志。

一如既往的,我们先要建立一个交换器:

channel.exchangeDeclare(EXCHANGE_NAME,
"direct");
然后我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity,
null, message.getBytes());
为了简化问题,我们假设状态有“信息”“警告”“错误”。

 

订阅

接收消息就像前面的教程一样,但有一个例外——我们将为我们感兴趣的每个严重程度创建一个新的绑定关系。

String queueName=channel.queueDeclare().getQueue();
for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
 

整合程序

                           

 

 EmitLogDirect.java class:

import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
 
    private static final String
EXCHANGE_NAME = "direct_logs";
 
    public static void main(String[]
argv)
                  throws java.io.IOException {
 
        ConnectionFactory factory =
new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        channel.exchangeDeclare(EXCHANGE_NAME,
"direct");
 
        String severity = getSeverity(argv);
        String message = getMessage(argv);
 
        channel.basicPublish(EXCHANGE_NAME, severity,
null, message.getBytes());
        System.out.println(" [x] Sent '" + severity +
"':'" + message +
"'");
 
        channel.close();
        connection.close();
    }
    //..
}
 

ReceiveLogsDirect.java:

import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
 
  private static final String
EXCHANGE_NAME = "direct_logs";
 
  public static void main(String[]
argv) throws Exception {
    ConnectionFactory factory =
new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
 
    channel.exchangeDeclare(EXCHANGE_NAME,
"direct");
    String queueName = channel.queueDeclare().getQueue();
 
    if (argv.length <
1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }
 
    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
    Consumer consumer =
new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String
consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties,
byte[] body) throws IOException {
        String message =
new String(body,
"UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() +
"':'" + message +
"'");
      }
    };
    channel.basicConsume(queueName,
true, consumer);
  }
}
 

(源码:EmitLogDirect.java 和
ReceiveLogsDirect.java )

下一篇教程我们将了解如何根据模式侦听消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  消息队列 java RabbitMQ