RabbitMQ
2017-11-26 12:09
281 查看
参考:https://www.cnblogs.com/luxiaoxun/p/3918054.html
1.RabbitMQ简述
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。作用是:异步,解耦,缓冲,消息分发
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。它是一种二进制协议。默认启动端口 5672。
主要分为三个部分:生产者,交换机和队列,消费者。
需要注意消息持久化,目的为了防止 RabbitMQ 宕机;考虑 ACK 机制,目的为了如果消费者对消息的处理失败了,那么后续要如何处理。
具体特点:
⑴可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
⑵灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
⑶消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
⑷高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
⑸多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
⑹多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
⑺管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
⑻跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
⑼插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
2.RabbitMQ概念模型
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
3.exchange交换机机制
producer并不知道消息具体分发到哪个队列中去,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchange的4种type去定义的。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中,在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding
key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
exchange和queue通过routing-key关联,这两者之间的关系是就是binding。如下图所示,X表示交换机,红色表示队列,交换机通过一个routing-key去binding一个queue,routing-key有什么作用呢?看Direct exchange类型交换机。
'
⑴Directed Exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
⑵Topic Exchang
通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中*表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。
⑶Fanout Exchange
扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。
⑷Header Exchange
设置header attribute参数类型的交换机。
4.ACK机制
采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。这样仅能保证每次Consumer接到数据。
为了保证数据能被正确处理,我们不能采用no-ack。而应该是在处理完数据后发送ack。
5.代码分析
消费者1:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
@SuppressWarnings("deprecation")
public class Consumer {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;
switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1Received " + new String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
}
}
}
消费者2:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Consumer2 {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;
switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct",
e213
true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning222");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2Received " + new String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
}
}
}
生产者:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import javax.xml.parsers.FactoryConfigurationError;
import com.rabbitmq.client.*;
public class producerTest {
//定义枚举,交换类型
public enum XT {
DIRECT, TOPIC, HEADERS, FANOUT
}
private static final String QUEUE_NAME = "log2";
private static boolean GetInputString() {
message = scanner.nextLine();
if (message.length() == 0) return false;
return true;
}
private static Scanner scanner = new Scanner(System.in);
private static String message = "";
public static String XCHG_NAME = "xchg6";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(); //声明一个连接
Channel channel = connection.createChannel(); //声明消息通道
// XT xt = XT.DIRECT;
XT xt = XT.TOPIC;
//XT xt = XT.FANOUT;
switch (xt) {
case FANOUT:
//广播给所有队列 接收方也必须通过fanout交换机获取消息,所有连接到该交换机的consumer均可获取消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃
//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);
while (GetInputString()) {
//向server发布一条消息
//参数1:exchange名字,若为空则使用默认的exchange
//参数2:routing key
//参数3:其他的属性
//参数4:消息体
//RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
//任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
//发送一条广播消息,参数2此时无意义
channel.basicPublish(XCHG_NAME, "", null, message.getBytes());
System.out.println("Send " + message);
}
break;
case DIRECT:
//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
//向所有绑定了相应routing key的队列发送消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃
//如果有多个consumer监听了相同的routing key 则他们都会受到消息
channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);
while (GetInputString()) {
//input like : info message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
case TOPIC:
//与direct模式有类似之处,都使用routing key作为路由
//不同之处在于direct模式只能指定固定的字符串,而topic可以指定一个字符串模式
channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
while (GetInputString()) {
//input like : topic message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
}
channel.close();
connection.close();
}
}
1.RabbitMQ简述
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。作用是:异步,解耦,缓冲,消息分发
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。它是一种二进制协议。默认启动端口 5672。
主要分为三个部分:生产者,交换机和队列,消费者。
需要注意消息持久化,目的为了防止 RabbitMQ 宕机;考虑 ACK 机制,目的为了如果消费者对消息的处理失败了,那么后续要如何处理。
具体特点:
⑴可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
⑵灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
⑶消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
⑷高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
⑸多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
⑹多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
⑺管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
⑻跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
⑼插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
2.RabbitMQ概念模型
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
3.exchange交换机机制
producer并不知道消息具体分发到哪个队列中去,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchange的4种type去定义的。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中,在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding
key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
exchange和queue通过routing-key关联,这两者之间的关系是就是binding。如下图所示,X表示交换机,红色表示队列,交换机通过一个routing-key去binding一个queue,routing-key有什么作用呢?看Direct exchange类型交换机。
'
⑴Directed Exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
⑵Topic Exchang
通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中*表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。
⑶Fanout Exchange
扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。
⑷Header Exchange
设置header attribute参数类型的交换机。
4.ACK机制
采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。这样仅能保证每次Consumer接到数据。
为了保证数据能被正确处理,我们不能采用no-ack。而应该是在处理完数据后发送ack。
5.代码分析
消费者1:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
@SuppressWarnings("deprecation")
public class Consumer {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;
switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1Received " + new String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
}
}
}
消费者2:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Consumer2 {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;
switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct",
e213
true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning222");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2Received " + new String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
}
}
}
生产者:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import javax.xml.parsers.FactoryConfigurationError;
import com.rabbitmq.client.*;
public class producerTest {
//定义枚举,交换类型
public enum XT {
DIRECT, TOPIC, HEADERS, FANOUT
}
private static final String QUEUE_NAME = "log2";
private static boolean GetInputString() {
message = scanner.nextLine();
if (message.length() == 0) return false;
return true;
}
private static Scanner scanner = new Scanner(System.in);
private static String message = "";
public static String XCHG_NAME = "xchg6";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(); //声明一个连接
Channel channel = connection.createChannel(); //声明消息通道
// XT xt = XT.DIRECT;
XT xt = XT.TOPIC;
//XT xt = XT.FANOUT;
switch (xt) {
case FANOUT:
//广播给所有队列 接收方也必须通过fanout交换机获取消息,所有连接到该交换机的consumer均可获取消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃
//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);
while (GetInputString()) {
//向server发布一条消息
//参数1:exchange名字,若为空则使用默认的exchange
//参数2:routing key
//参数3:其他的属性
//参数4:消息体
//RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
//任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
//发送一条广播消息,参数2此时无意义
channel.basicPublish(XCHG_NAME, "", null, message.getBytes());
System.out.println("Send " + message);
}
break;
case DIRECT:
//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
//向所有绑定了相应routing key的队列发送消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃
//如果有多个consumer监听了相同的routing key 则他们都会受到消息
channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);
while (GetInputString()) {
//input like : info message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
case TOPIC:
//与direct模式有类似之处,都使用routing key作为路由
//不同之处在于direct模式只能指定固定的字符串,而topic可以指定一个字符串模式
channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
while (GetInputString()) {
//input like : topic message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
}
channel.close();
connection.close();
}
}
1.扇形分发: 生产者: 23213 Send 23213 消费者1: 消费者1Received 23213 消费者2: 消费者2Received 23213 2.direct 分发(记得改下交换机名字) 生产者: info,heidou Send info,heidou 消费者1: 消费者1Received heidou 消费者2: 消费者2Received heidou 生产者: warning,123 Send warning,123 消费者1: 消费者1Received 123 消费者2: 没有匹配上所以无输出 3.TOPIC分发: 生产者: warning.heidou,123 Send warning.heidou,123 消费者1: 消费者1Received 123 消费者2: 消费者2Received 123
相关文章推荐
- rabbitmq开启web管理后台
- 中间件系列六 RabbitMQ之Topic exchange 用法
- 跟我学系列之RabbitMQ深入研究
- RabbitMQ 安装与简单使用
- RabbitMQ(六):使用主题进行消息分发
- linux 安装 RabbitMQ 3.6.13, Erlang 19.0
- rabbitmq安装与配置
- RabbitMQ3.7.2入门到进阶之订阅模式Publish/Subscribe
- RabbitMQ基础概念详细介绍
- rabbitmq的三种exchange使用fanout+direct+topic
- RabbitMQ的安装,配置,监控
- RabbitMQ
- RabbitMQ总结
- RabbitMQ 安装教程
- Python操作RabbitMQ
- RabbitMQ 安装教程
- 消息队列之RabbitMQ
- RabbitMQ系列教程之二:工作队列(Work Queues)(转载)
- RabbitMQ,ActiveMQ,ZeroMQ,Kafka几种MQ的比较
- RabbitMQ的傻瓜式讲解(杨永杰版本)