您的位置:首页 > 其它

RabbitMQ

2017-11-26 12:09 281 查看
参考:https://www.cnblogs.com/luxiaoxun/p/3918054.html

1.RabbitMQ简述

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。作用是:异步,解耦,缓冲,消息分发

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。它是一种二进制协议。默认启动端口 5672。

主要分为三个部分:生产者,交换机和队列,消费者。

需要注意消息持久化,目的为了防止 RabbitMQ 宕机;考虑 ACK 机制,目的为了如果消费者对消息的处理失败了,那么后续要如何处理。

具体特点:

⑴可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

⑵灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

⑶消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

⑷高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

⑸多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

⑹多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

⑺管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

⑻跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

⑼插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

2.RabbitMQ概念模型



左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

中间即是 RabbitMQ,其中包括了 交换机 和 队列。

右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。

这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
3.exchange交换机机制

producer并不知道消息具体分发到哪个队列中去,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchange的4种type去定义的。

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中,在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding
key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

exchange和queue通过routing-key关联,这两者之间的关系是就是binding。如下图所示,X表示交换机,红色表示队列,交换机通过一个routing-key去binding一个queue,routing-key有什么作用呢?看Direct exchange类型交换机。



'
⑴Directed Exchange

路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。

Default Exchange

这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

⑵Topic Exchang

通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中*表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

⑶Fanout Exchange

扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。

所有该exchagne上指定的routing-key都会被ignore掉。

⑷Header Exchange

设置header attribute参数类型的交换机。

4.ACK机制

采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。这样仅能保证每次Consumer接到数据。

为了保证数据能被正确处理,我们不能采用no-ack。而应该是在处理完数据后发送ack。

5.代码分析

消费者1:

package test2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

@SuppressWarnings("deprecation")
public class Consumer {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;

//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;

switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度

//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);

//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1Received " + new String(delivery.getBody()));

//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者

}
}
}
消费者2:
package test2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Consumer2 {
private static final String QUEUE_NAME = "log2";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;

//producerTest.XT xt = producerTest.XT.FANOUT;
// producerTest.XT xt = producerTest.XT.DIRECT;
producerTest.XT xt = producerTest.XT.TOPIC;

switch (xt) {
/* case DEFAULT:
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
break;*/
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(producerTest.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, producerTest.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(producerTest.XCHG_NAME, "direct",
e213
true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning222");
break;
case TOPIC:
channel.exchangeDeclare(producerTest.XCHG_NAME, "topic", true, true, null);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, producerTest.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, producerTest.XCHG_NAME, "*.blue");
break;
}
// 在同一时间不要给一个worker一个以上的消息。
// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
// 替代的,消息将会分发给下一个不忙的worker。
channel.basicQos(1); //server push消息时的队列长度

//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);

//为channel声明一个consumer,服务器会推送消息
//参数1:队列名称
//参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
//参数3:消费者
channel.basicConsume(queueName, false, consumer);
//channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2Received " + new String(delivery.getBody()));

//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者

}
}
}
生产者:
package test2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import javax.xml.parsers.FactoryConfigurationError;

import com.rabbitmq.client.*;

public class producerTest {
//定义枚举,交换类型
public enum XT {
DIRECT, TOPIC, HEADERS, FANOUT
}
private static final String QUEUE_NAME = "log2";
private static boolean GetInputString() {
message = scanner.nextLine();
if (message.length() == 0) return false;
return true;
}

private static Scanner scanner = new Scanner(System.in);
private static String message = "";
public static String XCHG_NAME = "xchg6";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection(); //声明一个连接
Channel channel = connection.createChannel(); //声明消息通道

// XT xt = XT.DIRECT;
XT xt = XT.TOPIC;
//XT xt = XT.FANOUT;

switch (xt) {
case FANOUT:
//广播给所有队列 接收方也必须通过fanout交换机获取消息,所有连接到该交换机的consumer均可获取消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃

//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);

while (GetInputString()) {
//向server发布一条消息
//参数1:exchange名字,若为空则使用默认的exchange
//参数2:routing key
//参数3:其他的属性
//参数4:消息体
//RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
//任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
//发送一条广播消息,参数2此时无意义
channel.basicPublish(XCHG_NAME, "", null, message.getBytes());
System.out.println("Send " + message);
}
break;
case DIRECT:
//定义一个交换机
//参数1:交换机名称
//参数2:交换机类型
//参数3:交换机持久性,如果为true则服务器重启时不会丢失
//参数4:交换机在不被使用时是否删除
//参数5:交换机的其他属性
//向所有绑定了相应routing key的队列发送消息
//如果producer在发布消息时没有consumer在监听,消息将被丢弃
//如果有多个consumer监听了相同的routing key 则他们都会受到消息
channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);
while (GetInputString()) {
//input like : info message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
case TOPIC:
//与direct模式有类似之处,都使用routing key作为路由
//不同之处在于direct模式只能指定固定的字符串,而topic可以指定一个字符串模式
channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
while (GetInputString()) {
//input like : topic message
String[] temp = message.split(",");
channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
System.out.println("Send " + message);
}
break;
}
channel.close();
connection.close();
}

}
1.扇形分发:
生产者:
23213
Send 23213
消费者1:
消费者1Received 23213
消费者2:
消费者2Received 23213

2.direct 分发(记得改下交换机名字)
生产者:
info,heidou
Send info,heidou
消费者1:
消费者1Received heidou
消费者2:
消费者2Received heidou

生产者:
warning,123
Send warning,123
消费者1:
消费者1Received 123
消费者2:
没有匹配上所以无输出

3.TOPIC分发:
生产者:
warning.heidou,123
Send warning.heidou,123
消费者1:
消费者1Received 123
消费者2:
消费者2Received 123
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: