RabbitMQ 入门
2015-08-30 11:59
477 查看
简介
RabbitMQ是一个Message Broker,核心思想就是接受消息,转发消息。实现的协议:AMQP。
术语(Jargon)
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
Simple Demo
![](file:///C:/Users/cacard/AppData/Local/Temp/enhtmlclip/Image(10).png)
发送方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //(如果没有就)创建Queue String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以byte的方式发布 System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();
[/code]
接收方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);//看一下Queue是否存在 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery();//阻塞,直到接收到一条消息 String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
订阅/发布Demo
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
Exchanges
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
![](file:///C:/Users/cacard/AppData/Local/Temp/enhtmlclip/Image(11).png)
exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列
![](file:///C:/Users/cacard/AppData/Local/Temp/enhtmlclip/Image(12).png)
发送方
String EXCHANGE_NAME = "logs"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 设置 exchange 类型 channel.exchangeDeclare(EXCHANGE_NAME /*exchange名称*/, "fanout"/*类型*/); // 发布消息时,指定 exchange 名称 channel.basicPublish( EXCHANGE_NAME , "", null, message.getBytes()); channel.close(); connection.close();
接收方(可多个同时运行)
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 设置exchange名称和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个临时的、带有随机名称的Queue,用来与 exchange 绑定 String queueName = channel.queueDeclare().getQueue(); hannel.queueBind(queueName, EXCHANGE_NAME, ""); // 绑定 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
Install
@Windows
1 先安装Erlang。
2 官方网下载 .exe。
管理
命令行管理 http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
WebUI管理
> rabbitmq-plugins enable rabbitmq_management
重启后访问 http://localhost:15672/ guest,guest
角色
management
policymaker
monitoring
administrator
添加用户并分配角色
> rabbitmqctl add_user name pass
> rabbitmqctl set_user_tags name administrator
插件管理
启用插件
> rabbitmq-plugins enable plugin-name
配置文件
etc\rabbitmq.config
工作队列:Working Queue
工作队列这个概念与简单的发送/接收消息的区别就是:接收方接收到消息后,可能需要花费更长的时间来处理消息,这个过程就叫一个Work/Task。
几个概念
分配:多个接收端接收同一个Queue时,如何分配?
消息确认:Server端如何确定接收方的Work已经对消息进行了完整的处理?
消息持久化:发送方、服务端Queue如何对未处理的消息进行磁盘持久化?
Round-robin分配
多个接收端接收同一个Queue时,采用了Round-robin分配算法,即轮叫调度——依次分配给各个接收方。
消息确认
默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。
对于耗时的work,可以先关闭自动消息确认,在work完成后,再手动发回确认。
channel.basicConsume("hello",false/*关闭自动消息确认*/,consumer);
// ...work完成后
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
持久化
1. Server端的Queue持久化
注意的是,如果已经声明了同名非持久化的Queue,则再次声明无效。
发送方和接收方都需要指定该参数。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
2. Message持久化
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
负载分配
为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-robin。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
意思为,最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。
固定关键词路由:Routing
使用类型为direct的exchange,发送特定关键词(RoutingKey)的消息给订阅该关键词的Queue。
场景示例:消息发送方发送了类型为[error][info]的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印的接收两者。
![](http://images.cnitblog.com/i/1408/201403/141048547005411.png)
(上图采用了不同颜色来作为routingKey)
发送方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"/*exchange类型为direct*/); channel.basicPublish(EXCHANGE_NAME, "info"/*关键词=info*/, null, message.getBytes()); channel.close(); connection.close();
接收方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"/*exchange类型为direct*/); // 创建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 订阅某个关键词,绑定到匿名Queue中 channel.queueBind(quueName,EXCHANGE_NAME,"error"); channel.queueBind(quueName,EXCHANGE_NAME,"info"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词
关键词模式路由:Topics
这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅模糊关键词。
关键词必须是一组word,由点号分割。例如"xxx.yyy.zzz",限定255bytes。
* 表示一个word;
# 表示0个或者多个word;
![](http://images.cnitblog.com/i/1408/201403/141049214484306.png)
发送方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"/*exchange类型*/); channel.basicPublish(EXCHANGE_NAME, "xxx.yyy"/*关键词routingKey*/, null, message.getBytes()); channel.close(); connection.close();
接收方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"/*exchange类型*/); // 创建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 订阅某个关键词,绑定到匿名Queue中 channel.queueBind(quueName,EXCHANGE_NAME,"*.yyy"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词
相关文章推荐
- scala 14 trait
- Schedule
- Ubuntu 14.04搭建Android5.1开发环境和编译
- 【JS/读书随笔】JavaScript编程精解/Eloquent JavaScript:Chapter 11 浏览器事件
- 【英语】Bingo口语笔记(60) - 口语中的浊化发音
- scala 13 抽象类 字段方法
- QMap 的增删改查
- GYM 100345E New Mayors(二分图染色)
- hdu5424 Rikka with Graph II(n个点n条边的图判哈密顿通路)
- hdoj 3987 Harry Potter and the Forbidden Forest 【求所有最小割里面 最少的边数】
- POJ 1006 Biorhythms
- windows C++ 多任务并发执行设计
- PCIE协议解析 synopsys IP Power Management Capability 读书笔记(10)
- 自定义控件出属性设置
- xmu 1018 零零漆的作
- 借贷宝有多少人看得懂?借贷宝系统崩溃分析
- BZOJ 1009: [HNOI2008]GT考试( dp + 矩阵快速幂 + kmp )
- Gym 100345H Settling the Universe Up
- scala 第12讲 继承 构造
- [leetcode] 71.Simplify Path