RabbitMQ学习(五)之主题topic(java)
2016-07-15 15:35
459 查看
转载来自 http://blog.csdn.net/zhu_tianwei/article/details/40887775
direct类型的消息通过绑定键转发到队列,但是存在一些局限性:它不能够基于多重条件进行路由选择,有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅,这就需要主题类型的转发器来实现。
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
1.发送日志消息SendLogTopic,发送4个消息绑定不同的绑定键, "kernal.info", "cron.warning", "auth.info", "kernel.critical"
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import java.util.UUID;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//发送消息端
public class SendLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//定义绑定键
String[] routing_keys = new String[] { "kernal.info", "cron.warning",
"auth.info", "kernel.critical" };
for (String routing_key : routing_keys)
{
//发送4条不同绑定键的消息
String msg = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
.getBytes());
System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
}
channel.close();
connection.close();
}
}
2.定义接收kernel.*消息的消费者
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//接收kernel.*消息
public class ReceiveLogsTopicForKernel {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
//接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
}
3.接收*.critical消息消费者
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//接收*.critical消息
public class ReceiveLogsTopicForCritical {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
System.out
.println(" [*] Waiting for critical messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
}
启动2个消费者,再启动发送4类消息生产者。观察接收到的消息,都收到对应的消息。可以看出使用topic类型的转发器,成功实现了多重条件选择的订阅。
direct类型的消息通过绑定键转发到队列,但是存在一些局限性:它不能够基于多重条件进行路由选择,有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅,这就需要主题类型的转发器来实现。
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
1.发送日志消息SendLogTopic,发送4个消息绑定不同的绑定键, "kernal.info", "cron.warning", "auth.info", "kernel.critical"
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import java.util.UUID;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//发送消息端
public class SendLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//定义绑定键
String[] routing_keys = new String[] { "kernal.info", "cron.warning",
"auth.info", "kernel.critical" };
for (String routing_key : routing_keys)
{
//发送4条不同绑定键的消息
String msg = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
.getBytes());
System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
}
channel.close();
connection.close();
}
}
2.定义接收kernel.*消息的消费者
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//接收kernel.*消息
public class ReceiveLogsTopicForKernel {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
//接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
}
3.接收*.critical消息消费者
[java]
view plain
copy
print?
package cn.slimsmart.rabbitmq.demo.topic;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//接收*.critical消息
public class ReceiveLogsTopicForCritical {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.101.174");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
System.out
.println(" [*] Waiting for critical messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
}
启动2个消费者,再启动发送4类消息生产者。观察接收到的消息,都收到对应的消息。可以看出使用topic类型的转发器,成功实现了多重条件选择的订阅。
相关文章推荐
- java序列化接口Serializable的使用
- RabbitMQ学习(四)之路由(java)
- Maven、gradle、Ant、Eclipse IDE 之间的关系
- RabbitMQ学习(三)之发布/订阅(java)
- Java annotation 自定义注释@interface的用法
- JDK1.6 安装--错误(找不到或无法加载主类 com.sun.tools.javac.Main)
- WebBrowser.java: Display HTML file in JEditorPane
- [转载] How to Install OpenJDK 8 in Ubuntu 14.04 & 12.04 LTS
- 请问Eclipse和MyEclipse在能、使用上究竟有什么不同?(除了免费与收费的区别外)
- RabbitMQ学习(二)之工作队列(java)
- JAVA Servlet
- Fragment抛出java.lang.IllegalStateException: Fragment not attached to Activity
- 算法日记(Java实现)第20160715(1)期——POJ1001/POJ1002
- RabbitMQ学习(一)之helloword(java)
- 学习笔记之Struts2—浅析接收参数
- javax.mail发送邮件
- spring源码分析之spring-core-env
- java基础数据类型(你还记得几个)
- java基本运算符
- [新手必读] 既然有了Swing, 那为什么还要SWT?