rabbitmq
2016-03-27 08:37
302 查看
1. RabbitMQ简介
l MQ全称为MessageQueue,消息队列是应用程序和应用程序之间的通信方法。l RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。
l 支持主流的操作系统,Linux、Windows、MacOX等。
l 多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等
1.1. AMQP
1.2. Erlang
RabbitMQ是用Erlang语言开发的。1.3. 其他MQ产品
2. 官网下载
下载:
教程:
3. 安装
具体参考《RabbitMQ-3.4.1安装手册.docx》。3.1. 安装步骤
1. 安装环境Erlang2. 安装RabbitMQ
3.2. 安装Erlang注意事项
1. 计算机名不能为中文2. 用户名不能是中文
3. 安装路径不能有中文,推荐使用默认安装路径
4. 需要使用管理员账号进行安装
注意:
如果安装不成功,不要浪费太多时间在安装。
1. 找同学,开一个账号
2. 我在的时候,连接老师的RabbitMQ服务
3. 在最后一天,我们会使用Linux安装
3.3. 启用管理工具:
命令行工具目录:在命令行输入:rabbitmq-plugins enable rabbitmq_management
安装成功:http://127.0.0.1:15672/
默认账号密码
guest
3.4. 安装成功
3.5. 功能介绍
程序和RabbitMQ交互使用的是:5672端口。
4. 创taotao用户
4.1. 创建用户
4.2. 创建 Virtual Hosts
4.3. 设置用户vhosts
4.4. 效果
5. 消息队列
学习是前5种。
5.1. 简单的消息队列
P:消息的生产者;
C:消息的消费者;
红色框:消息队列;
5.1.1. 创建连接
5.1.2. 生产者向队列中发送消息
5.1.3. 消费者
5.2. work模式
同一个消息只能被一个客户端所获取。
5.2.1. 生产者
发送100条消息到队列,并且每次发送消息后sleep i*10 毫秒;5.2.2. 消费者1
从队列中获取消息,每次获取到消息后,休眠10毫秒,这个消费者消费消息比较快。publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
//休眠
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
5.2.3. 消费者2
从队列中获取消息,每次获取到消息后,休眠1000毫秒,这个消费者消费消息比较慢。publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 同一时刻服务器只会发一条消息给消费者
//
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
//
监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
5.2.4. 测试结果
结果:消费者1和消费者2获取到的消息是一样多,各50个。 这样是否合理? -- 不合理的。5.3. 自动和手动反馈消息消费状态
自动:从服务端获取到消息后,就认为该消息已经成功消费,无论客户端是否出异常。
手动:
从服务端获取消息后,服务端要标记为该不可用状态,等待客户端的反馈,如果客户端一直没有反馈,该消息一直被标记为不可用,同时认为该消费者在消费消息中
如果接收到客户端的反馈,服务端就将该消息删除。
如何选择手动和自动? -- 根据不同的业务需求做出不同选择。
5.4. work模式中的能者多劳
实际这种情况更加的合理。
测试结果:
消费者1:67条消息
消费者2:33条消息
5.5. 订阅模式
订阅:订阅微信公众号,微信公众号群发消息,所有的订阅者都能够收到消息
生产者可以绑定到交换机和队列
消费者只能绑定到队列
5.5.1. 队列绑定到交换机
5.5.2. 生产者
5.5.3. 消费者1
publicclass Recv {privatefinalstatic String
QUEUE_NAME =
"test_queue_ps_1";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_fanout";
publicstaticvoid
main(String[] argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
15b5a
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.5.4. 消费者2
publicclass Recv2 {privatefinalstatic String
QUEUE_NAME =
"test_queue_ps_2";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_fanout";
publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.5.5. 测试
结果:消费者1和消费者2同时都能获取到消息。注意:订阅模式和work模式的区别。
1、 work模式将消息发送到队列
2、 订阅模式将消息发送到交换机
3、 work模式是1个队列2个消费者,订阅模式是2个队列2个消费者
5.6. 路由模式
路由模式:
可以在队列绑定到交换机时指定一个规则,根据不同的消息中规则,选择是否接受该消息。
5.6.1. 生产者
5.6.2. 消费者1
publicclass Recv {privatefinalstatic String
QUEUE_NAME =
"test_queue_rirect_1";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_direct";
publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.6.3. 消费者2
publicclass Recv2 {privatefinalstatic String
QUEUE_NAME =
"test_queue_rirect_2";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_direct";
publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key2");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.7. 通配符匹配模式
5.7.1. 生产者
5.7.2. 消费者1
publicclass Recv {privatefinalstatic String
QUEUE_NAME =
"test_queue_topic_1";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_topic";
publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"item.UPDATE");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.7.3. 消费者2
publicclass Recv2 {privatefinalstatic String
QUEUE_NAME =
"test_queue_topic_2";
privatefinalstatic String
EXCHANGE_NAME =
"test_exchange_topic";
publicstaticvoid main(String[]
argv)
throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel =
connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"item.#");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer =
new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME,
false,
consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
String message =
new String(delivery.getBody());
System.out.println(" [x] Received '" +
message +
"'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
5.7.4. 测试
消费者根据绑定关系中的key规则来接受不同的消息。5.8. 在界面中完成队列和交换机的绑定
6. Spring-AMQP
6.1. 使用
6.1.1. 导入依赖
依赖关系:
6.1.2. 定义连接工厂
6.1.3. 定义队列
6.1.4. 定义交换机
定义交换机并且完成队列和交换机的绑定。6.1.5. 定义模板
6.1.6. 在Spring容器中获取模板发送消息
注意:消息是发送到交换机。
6.1.7. 定义监听
6.1.8. Foo消费者
6.2. MQ的管理
管理包括:队列声明、交换机申明等。
6.3. 队列的持久化
RabbitMQ的队列有2种,一种是内存队列,一种是持久化队列1、 内存队列
a) 优点:速度快,效率高
b) 缺点:宕机,消息丢失
2、 持久化队列
a) 优点:消息可以持久化保存,宕机或断电后消息不丢失
b) 缺点:比内存存储速度慢,性能差
相关文章推荐
- NumPy学习笔记1
- 代码大全_第4部分_语句
- java学习之---java虚拟机浅谈
- 我的第一个 Rails 站点:极简优雅的笔记工具-Raysnote
- 【学习笔记】对OOP思想及基本设计原则的理解
- 敏捷开发方法综述
- [mysql] mysqldump 导出数据库表
- expect
- 用 openSSL 生成 公钥 私钥
- awk
- Android 网络传输 async-http-client
- Some issue in my Interiew
- android adb
- QML设计登陆界面
- dist-upgrade
- 有统计的百度分享
- 近期找工作的一些琐碎事
- myEclipse 创建服务器
- 20160323servlet学习笔记常见状态码消息头的含义
- 绘制水印