RabbitMQ入门与使用篇
2017-08-15 17:48
405 查看
介绍
RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。RabbitMQ的官方
概念:
Brocker:消息队列服务器实体。
Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。
Queue:消息队列,每个消息都会被投入到一个或者多个队列里。
Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
消息接收
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
消息发布
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
AMQP 里主要要说两个组件:
Exchange 和 Queue
绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker
这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。
Exchange通常分为四种:
fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中
topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配
headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
使用场景
官方介绍
下载与安装
下载rabbitmq
erlang
安装
先安装erlang
然后再安装rabbitmq
管理工具
参考官方文档操作起来很简单,只需要在DOS下面,进入安装目录(
安装路径\RabbitMQ Server\rabbitmq_server-3.2.2\sbin)执行如下命令就可以成功安装。
rabbitmq-plugins enable rabbitmq_management
可以通过访问:
http://localhost:15672进行测试,默认的登陆账号为:guest,密码为:guest。
其他配置
1. 安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。set ERLANG_HOME=F:\Program Files\erl9.0 #环境变量`path`里加入:%ERLANG_HOME%\bin #环境变量`path`里加入: 安装路径\RabbitMQ Server\rabbitmq_server-3.6.10\sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。
rabbitmq-plugins.bat enable rabbitmq_management
3.创建管理用户
rabbitmqctl.bat add_user sa 123456
4. 设置管理员
rabbitmqctl.bat set_user_tags sa administrator
5.设置权限
rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*"
6. 其他命令
#查询用户: rabbitmqctl.bat list_users #查询vhosts: rabbitmqctl.bat list_vhosts #启动RabbitMQ服务: net stop RabbitMQ && net start RabbitMQ
以上这些,账号、vhost、权限、作用域等基本就设置完了。
基于.net使用
RabbitMQ.Client 是RabbiMQ 官方提供的的客户端EasyNetQ 是基于RabbitMQ.Client 基础上封装的开源客户端,使用非常方便
以下操作RabbitMQ的代码例子,都是基于EasyNetQ的使用和再封装,在文章底部有demo例子的源码下载地址
创建 IBus
/// <summary> /// 消息服务器连接器 /// </summary> public class BusBuilder { public static IBus CreateMessageBus() { // 消息服务器连接字符串 // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"]; string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456"; if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty"); return RabbitHutch.CreateBus(connString); } }
Fanout Exchange
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。
/// <summary> /// 消息消耗(fanout) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="handler">回调</param> /// <param name="exChangeName">交换器名</param> /// <param name="queueName">队列名</param> /// <param name="routingKey">路由名</param> public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } /// <summary> /// 消息上报(fanout) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="topic">主题名</param> /// <param name="t">消息命名</param> /// <param name="msg">错误信息</param> /// <returns></returns> public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } }
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
/// <summary> /// 消息发送(direct) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="queue">发送到的队列</param> /// <param name="message">发送内容</param> public static void DirectSend<T>(string queue, T message) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Send(queue, message); } } /// <summary> /// 消息接收(direct) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="queue">接收的队列</param> /// <param name="callback">回调操作</param> /// <param name="msg">错误信息</param> /// <returns></returns> public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); bus.Receive<T>(queue, callback); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } /// <summary> /// 消息发送 /// <![CDATA[(direct EasyNetQ高级API)]]> /// </summary> /// <typeparam name="T"></typeparam> /// <param name="t"></param> /// <param name="msg"></param> /// <param name="exChangeName"></param> /// <param name="routingKey"></param> /// <returns></returns> public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 消息接收 /// <![CDATA[(direct EasyNetQ高级API)]]> /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="handler">回调</param> /// <param name="exChangeName">交换器名</param> /// <param name="queueName">队列名</param> /// <param name="routingKey">路由名</param> public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default")
相关文章推荐
- RabbitMQ 使用入门。含完整代码
- .NET 环境中使用RabbitMQ RabbitMQ与Redis队列对比 RabbitMQ入门与使用篇
- 关于RabbitMQ-C入门使用需要注意的几个问题
- RabbitMq入门与基本使用
- RabbitMQ入门与使用篇
- RabbitMQ入门及常用的5种模式的简单使用(一)
- RabbitMQ入门教程——.NET客户端使用
- RabbitMQ 使用入门
- RabbitMQ入门及常用的5种模式的简单使用(二)
- RabbitMQ使用场景练习:入门实例(一)
- RabbitMQ入门与使用篇 推荐
- RabbitMq入门以及使用教程
- Pika使用入门(二)【连接到RabbitMQ与回调传递风格】
- python RabbitMQ队列使用(入门篇)
- PHP中使用XML-RPC构造Web Service简单入门
- IDEA的入门使用(一)——快捷键
- RabbitMQ 入门案例
- C++ STL入门教程(5)——map(关联数组)的使用(附完整程序代码)
- OpenGL入门学习之八——使用显示列表
- ruby使用RabbitMQ——乱码问题