您的位置:首页 > Web前端 > Node.js

Node实战(第2季) 3 基于RabbitMQ搭建消息队列

2018-01-25 17:42 253 查看
  处理大并发而带来的CPU或I/O密集型问题最好的控制方法就是使用消息队列。对于服务器间跨语言通信,以前我们一般使用XMLRPC,现在比较流行HTTP协议的RESTful方式,而使用RabbitMQ也能够很灵活地处理这些事情。

3.1 什么是消息队列,消息队列的优势

   理解了队列,要解释“消息”,是在两台计算机间传送的数据单位。可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象(如XML或JSON)。

   消息队列具有如下几种优势

应用解耦:在项目启动之初,预测项目将会碰到什么需求是极其困难的。消息队列在处理过程的中间插入了一个隐含的基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约定。
冗余存储:处理数据有时会失败,除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们被完全处理,通过这一方式避免了数据丢失的风险。在被许多消息队列所采用的“插入-获取-删除”范式中,把一个消息从队列中删除之前,需要你的处理过程明确指出该消息已经被处理完毕。
可扩展性:因为消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,不需要改变代码,不需要调节参数,扩展就像调整风扇按钮一样简单。
灵活性和峰值处理能力:当你的应用访问流量突然迅速攀升时,当然,应用仍然需要继续提供服务,但是这样的突发流量并不常见,如果以能处理这类峰值访问为标准来投入资源并为其随时待命,无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发增长的访问压力,而不因超出负荷的请求而完全崩溃。
可恢复性:体系的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理的能力通常是造就用户略感不便的体验的关键要素。
送达保证:消息队列提供的冗余机制保证了消息能被实际地处理。获取一个消息只是“预定”了这个消息,暂时把它移出了队列。除非客户端明确表示已经处理完了这个消息,否则这个消息会被放回队列中,经过一段时间后再次被处理。
排序保证:在某些情况下,数据处理的顺序都很重要。消息队列本来就是排序的。并且能保证数据会按照特定的顺序来处理。RabbitMQ保证了消息通过FIFO的顺序来处理。
缓冲:在任何重要的系统中,都会有不同处理时间的场景。例如,加载一张图片比应用过滤器要花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率地执行,写入队列的处理尽可能快速,它不受队列另一端消费者的处理速度的约束,这样有助于提升整体系统的性能,不至于由于消费速度慢而导致流程不顺畅。
理解数据流:在分布式系统里,用户操作的时间长短,会关乎用户体验。消息队列通过消息被处理的频率和时长,来方便地定位那些处理时间长的服务。
异步通信:大多数时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但不立即处理它,然后在你乐意时再去处理它们。

     相信上述10个原因,使得消息队列成为在进程或应用之间进行通信的较好形式,队列是创建强大的分布式应用的关键。

3.2 安装和启动RabbitMQ

    由Erlang语言开发的。

    $ sudo docker pull rabbitmq

    # 启动服务

    $ sudo docker run -d -e RABBITMQ_NODENAME=my-rabbit --name some-rabbit -p 5672:5672 rabbitmq

    用了Docker我们就可以更加专注地开发业务代码了,不用因安装环境而浪费时间。

3.3 Hello World

    要连接RabbitMQ,需要安装连接包。依次执行命令,创建环境并安装依赖,这里使用官方推荐的npm包amqplib

    $ sudo mkdir /var/node/rabbit_hello -p

rabbit_hello/server.js

var amqp = require('amqplib');
amqp.connect('amqp://127.0.0.1').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ok = ch.assertQueue('hello', {durable: false});
ok = ok.then(function(_qok) {
return ch.consume('hello', function(msg) {
console.log(' [x] Received '%s' ', msg.content.toString());
}, {noAck: true});
});
return ok.then(function(_consumeOk) {
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}).then(null, console.warn);


3.4 RabbitMQ的工作队列

3.5 RabbitMQ的PUB/SUB队列

3.6 RabbitMQ的队列路由

3.7 RabbitMQ的RPC远程过程调用

3.8 基于RabbitMQ的Node.js和Python通信实例

3.9 RabbitMQ方案和HTTP方案的对比

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: