您的位置:首页 > 其它

rabbitmq

2016-03-27 08:37 302 查看

1.  RabbitMQ简介

l MQ全称为MessageQueue,消息队列是应用程序和应用程序之间的通信方法。

l RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。

l 支持主流的操作系统,Linux、Windows、MacOX等。

l 多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等

 

1.1. AMQP

1.2. Erlang

RabbitMQ是用Erlang语言开发的。

1.3. 其他MQ产品

 

2.  官网下载

 

下载:

 

教程:

3.  安装

具体参考《RabbitMQ-3.4.1安装手册.docx》。

 

3.1. 安装步骤

1.      安装环境Erlang

2.      安装RabbitMQ

 

 

3.2. 安装Erlang注意事项

1.      计算机名不能为中文

 

2.      用户名不能是中文

3.      安装路径不能有中文,推荐使用默认安装路径

4.      需要使用管理员账号进行安装

 

注意:

如果安装不成功,不要浪费太多时间在安装。

1.      找同学,开一个账号

2.      我在的时候,连接老师的RabbitMQ服务

3.      在最后一天,我们会使用Linux安装

 

3.3. 启用管理工具:

命令行工具目录:

在命令行输入:rabbitmq-plugins enable rabbitmq_management

 

安装成功:http://127.0.0.1:15672/

默认账号密码

guest

 

3.4. 安装成功

 

3.5. 功能介绍

 

 

程序和RabbitMQ交互使用的是:5672端口。

4.  创taotao用户

4.1. 创建用户

 

4.2. 创建 Virtual Hosts

 

4.3. 设置用户vhosts

 

4.4. 效果

 

5.  消息队列

 

学习是前5种。

 

5.1. 简单的消息队列

 

P:消息的生产者;

C:消息的消费者;

红色框:消息队列;

5.1.1.     创建连接

5.1.2.     生产者向队列中发送消息

5.1.3.     消费者

5.2. work模式

 

同一个消息只能被一个客户端所获取。

5.2.1.     生产者

发送100条消息到队列,并且每次发送消息后sleep i*10 毫秒;

 

5.2.2.     消费者1

从队列中获取消息,每次获取到消息后,休眠10毫秒,这个消费者消费消息比较快。

 

    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 同一时刻服务器只会发一条消息给消费者
       
//channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
//休眠
           
Thread.sleep(10);
           
// 返回确认状态
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
 

5.2.3.     消费者2

从队列中获取消息,每次获取到消息后,休眠1000毫秒,这个消费者消费消息比较慢。

 

    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 同一时刻服务器只会发一条消息给消费者
//       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
        //
监听队列,手动返回完成状态
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
// 休眠1秒
           
Thread.sleep(1000);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }

5.2.4.     测试结果

结果:消费者1和消费者2获取到的消息是一样多,各50个。  这样是否合理?  --  不合理的。

 

5.3. 自动和手动反馈消息消费状态

自动:

从服务端获取到消息后,就认为该消息已经成功消费,无论客户端是否出异常。

 

手动:

从服务端获取消息后,服务端要标记为该不可用状态,等待客户端的反馈,如果客户端一直没有反馈,该消息一直被标记为不可用,同时认为该消费者在消费消息中

如果接收到客户端的反馈,服务端就将该消息删除。

 

 

如何选择手动和自动?  --  根据不同的业务需求做出不同选择。

 

5.4. work模式中的能者多劳

 

 

实际这种情况更加的合理。

 

测试结果:

消费者1:67条消息

消费者2:33条消息

 

5.5. 订阅模式

 

 

订阅:订阅微信公众号,微信公众号群发消息,所有的订阅者都能够收到消息

 

生产者可以绑定到交换机和队列

 

消费者只能绑定到队列

 

5.5.1.     队列绑定到交换机

 

5.5.2.     生产者

5.5.3.     消费者1

publicclass Recv {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_ps_1";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_fanout";
 
    publicstaticvoid
main(String[] argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();

15b5a
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}

5.5.4.     消费者2

publicclass Recv2 {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_ps_2";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_fanout";
 
    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}
 

5.5.5.     测试

结果:消费者1和消费者2同时都能获取到消息。

 

注意:订阅模式和work模式的区别。

1、  work模式将消息发送到队列

2、  订阅模式将消息发送到交换机

3、  work模式是1个队列2个消费者,订阅模式是2个队列2个消费者

 

5.6. 路由模式

 

路由模式:

可以在队列绑定到交换机时指定一个规则,根据不同的消息中规则,选择是否接受该消息。

 

5.6.1.     生产者

5.6.2.     消费者1

publicclass Recv {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_rirect_1";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_direct";
 
    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}

5.6.3.     消费者2

publicclass Recv2 {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_rirect_2";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_direct";
 
    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key2");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}

5.7. 通配符匹配模式

 

 

5.7.1.     生产者

5.7.2.     消费者1

publicclass Recv {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_topic_1";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_topic";
 
    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"item.UPDATE");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
         
  System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}
 

5.7.3.     消费者2

publicclass Recv2 {
 
    privatefinalstatic String
QUEUE_NAME =
"test_queue_topic_2";
 
    privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_topic";
 
    publicstaticvoid main(String[]
argv)
throws Exception {
 
       
// 获取到连接以及mq通道
       
Connection connection = ConnectionUtil.getConnection();
       
Channel channel =
connection.createChannel();
 
       
// 声明队列
       
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
 
       
// 绑定队列到交换机
       
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"item.#");
 
       
// 同一时刻服务器只会发一条消息给消费者
       
channel.basicQos(1);
 
       
// 定义队列的消费者
       
QueueingConsumer consumer =
new QueueingConsumer(channel);
       
// 监听队列,手动返回完成
       
channel.basicConsume(QUEUE_NAME,
false,
consumer);
 
       
// 获取消息
       
while (true) {
           
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
           
String message =
new String(delivery.getBody());
           
System.out.println(" [x] Received '" +
message +
"'");
           
Thread.sleep(10);
 
           
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
       
}
    }
}
 

 

5.7.4.     测试

消费者根据绑定关系中的key规则来接受不同的消息。

 

5.8. 在界面中完成队列和交换机的绑定

6.  Spring-AMQP

 

 

 

6.1. 使用

6.1.1.     导入依赖

 

依赖关系:

6.1.2.     定义连接工厂

6.1.3.     定义队列

6.1.4.     定义交换机

定义交换机并且完成队列和交换机的绑定。

 

6.1.5.     定义模板

6.1.6.     在Spring容器中获取模板发送消息

 

注意:消息是发送到交换机。

6.1.7.     定义监听

6.1.8.     Foo消费者

6.2. MQ的管理

 

管理包括:队列声明、交换机申明等。

6.3. 队列的持久化

RabbitMQ的队列有2种,一种是内存队列,一种是持久化队列

1、  内存队列

a)        优点:速度快,效率高

b)        缺点:宕机,消息丢失

2、  持久化队列

a)        优点:消息可以持久化保存,宕机或断电后消息不丢失

b)        缺点:比内存存储速度慢,性能差

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: