node异步编程流程控制库
2017-09-11 23:52
399 查看
(一)尾触发和next()
尾触目前应用最多的地方就是connect中间件connect中间件api暴露方式
var app = connect(); // Middleware app.use(connect.staticCache()); app.use(connect.static(__dirname + '/public')); app.use(connect.cookieParser()); app.use(connect.session()); app.use(connect.query()); app.use(connect.bodyParser()); app.use(connect.csrf()); app.listen(3001);
在通过use()注册好一系列中间件后,监听端口上的请求;中间件利用了尾触的机制,最简单的中间件如下:
function (req, res, next) { // 中间件 }
每一个中间件携带请求对象req,响应对象res ,尾触函数 通过队列形成一个处理流。
中间件在处理网络请求时候,可以像面向切面编程一样进行过滤,验证,日志处理 而不与具体业务逻辑产生关联,以此实现解耦。
connect核心实现
function createServer() { function app(req, res){ app.handle(req, res); } utils.merge(app, proto); utils.merge(app, EventEmitter.prototype); app.route = '/'; app.stack = []; for (var i = 0; i < arguments.length; ++i) { app.use(arguments[i]); } return app; }; 这段代码通过如下代码创建HTTP服务器的request事件处理函数 function app(req, res){ app.handle(req, res); } 但是真正的核心代码是app.stack=[]; stack属性是这个服务器内部维护的中间件队列,通过use()方法我们可以将中间件放进队列,下面是use()方法重要的部分: app.use = function(route, fn){ // some code this.stack.push({ route: route, handle: fn }); return this; }; 此时就建好处理模型了,接下来 结合node原生http模块实现监听即可,监听函数的实现如下: app.listen=function(){ var server=http.createServer(this); return server.listen.apply(server,arguments); } 简化后的next方法如下(next的方法实现比较复杂) function next(err) { // some code // next callback layer = stack[index++]; layer.handle(req, res, next); } 值得提醒的是:尽管中间件这种尾触机制并不要求每个中间件方法都是异步的,但是如果每个步骤都采用异步来完成,实际上只是串行化处理,没办法通过并行的异步调用来提升业务的处理效率;流式处理可以将一些串行的逻辑扁平化,但是并行逻辑处理还需要搭配事件或者promise来完成,这样业务在纵向和横向都能各自清晰
(二)async模块
async模块提供了多达20种方法来处理异步流程:series() 方法支持没有依赖的异步串行
async.series([ function (callback) { fs.readFile('file1.txt', 'utf-8', callback); }, function (callback) { fs.readFile('file2.txt', 'utf-8', callback); } ], function (err, results) { // results => [file1.txt, file2.txt] });
异常处理:一旦发生异常立即结束所有调用。
2. parallel() 异步并行
async.parallel([ function (callback) { fs.readFile('file1.txt', 'utf-8', callback); }, function (callback) { fs.readFile('file2.txt', 'utf-8', callback); } ], function (err, results) { // results => [file1.txt, file2.txt] });
一旦发生异常,就会将异常作为第一个参数传递给最终的回调函数,只有数组中所有函数执行完毕后才会输出最终的result 顺序按照数组中添加函数的顺序。
回顾下EventProxy的方案:
var EventProxy = require('eventproxy'); var proxy = new EventProxy(); proxy.all('content', 'data', function (content, data) { callback(null, [content, data]); }) proxy.fail(callback); fs.readFile('file1.txt', 'utf-8', proxy.done('content')); fs.readFile('file2.txt', 'utf-8', proxy.done('data'));
这两种实现方式都是通过高阶函数来实现,
waterfall() 处理有依赖的异步串行
async.waterfall([ function (callback) { fs.readFile('file1.txt', 'utf-8', function (err, content) { callback(err, content); }); }, function (arg1, callback) { // arg1 => file2.txt fs.readFile(arg1, 'utf-8', function (err, content) { callback(err, content); }); }, function(arg1, callback){ // arg1 => file3.txt fs.readFile(arg1, 'utf-8', function (err, content) { callback(err, content); }); } ], function (err, result) { // result => file4.txt });
4 auto() 自动以最佳的方式处理有依赖的异步
业务场景
1 从磁盘读取配置文件
2 根据配置文件链接MongoDB
3 根据配置文件连接Redis
4 编译静态文件
5 上传静态文件到CDN
6 启动服务器
业务逻辑: { readConfig: function () {}, connectMongoDB: function () {}, connectRedis: function () {}, complieAsserts: function () {}, uploadAsserts: function () {}, startup: function () {} } var deps = { readConfig: function (callback) { // read config file callback(); }, connectMongoDB: ['readConfig', function (callback) { // connect to mongodb callback(); }], connectRedis: ['readConfig', function (callback) { // connect to redis callback(); }], complieAsserts: function (callback) { // complie asserts callback(); }, uploadAsserts: ['complieAsserts', function (callback) { // upload to assert callback(); }], startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function (callback) { // startup }] }; async.auto(deps); 如果用EventProx来实现则需要更细腻的事件分配
另外async还提供了数组中的方法:map(),forEach(),async模块也支持异步并发控制,详见parallelLimit()方法
async.parallelLimit([ function (callback) { fs.readFile('file1.txt', 'utf-8', callback); }, function (callback) { fs.readFile('file2.txt', 'utf-8', callback); } ], 1, function (err, results) { // TODO });
parallelLimit方法的缺陷是无法动态的增加要执行的方法,只能事先指定。
为此async提供了queue()方法来实现动态改变执行队列:这对于遍历文件系统时很有用
var q = async.queue(function (file, callback) { fs.readFile(file, 'utf-8', callback); }, 2); q.drain = function () { // 完成队列中的所有任务时触发 }; fs.readdirSync('.').forEach(function (file) { q.push(file, function (err, data) { // TODO }); });
但是queue方法接受的参数是固定的,又失去了parallelLimit的灵活性
bagpipe更好的控制异步并发
解决思路:1 通过一个队里来控制并发量
2 如果当前活跃(调用发起但未执行回调)的异步调用量小于限定值,从队列中取出执行
3 每个异步调用结束时,从队列中取出新的异步调用来执行
var Bagpipe = require('bagpipe'); // 设置并发数为10 var bagpipe = new Bagpipe(10); for (var i = 0; i < 100; i++) { bagpipe.push(async, function () { // 异步回调执行 }); } bagpipe.on('full', function (length) { console.warn('系统队列不能完全处理,当前阻塞长度为' + length); });
push()方法的核心实现如下:
Bagpipe.prototype.push = function (method) { var args = [].slice.call(arguments, 1); var callback = args[args.length - 1]; if (typeof callback !== 'function') { args.push(function () {}); } if (this.options.disabled || this.limit < 1) { method.apply(null, args); return this; } // 队列长度也超过限制时 if (this.queue.length < this.queueLength || !this.options.refuse) { this.queue.push({ method: method, args: args }); } else { var err = new Error('Too much async call in queue'); err.name = 'TooMuchAsyncCallError'; callback(err); } if (this.queue.length > 1) { this.emit('full', this.queue.length); } this.next(); return this; };
将调用推入队列后,调用一次next()方法尝试触发,其方法定义如下
Bagpipe.prototype.next = function () { var that = this; if (that.active < that.limit && that.queue.length) { var req = that.queue.shift(); that.run(req.method, req.args); } };
执行队列中的方法:
Bagpipe.prototype.run = function (method, args) { var that = this; that.active++; var callback = args[args.length - 1]; var timer = null; var called = false; // inject logic args[args.length - 1] = function (err) { // anyway, clear the timer if (timer) { clearTimeout(timer); timer = null; } // if timeout, don't execute if (!called) { that._next(); callback.apply(null, arguments); } else { // pass the outdated error if (err) { that.emit('outdated', err); } } }; var timeout = that.options.timeout; if (timeout) { timer = setTimeout(function () { // set called as true called = true; that._next(); // pass the exception var err = new Error(timeout + 'ms timeout'); err.name = 'BagpipeTimeoutError'; err.data = { name: method.name, method: method.toString(), args: args.slice(0, -1) }; callback(err); }, timeout); } method.apply(null, args); };
用户传入的回调函数被真正执行前,被替换封装过,这个封装的回调函数内部的逻辑活跃值得计数器减1后,主动调用next执行后续等待的异步调用。
bagpipe还可以实现超时控制,和拒绝模式
var limit_parallel_limit=10; var config={ timeout:3000. refuse:true }; var bagpipe=new Bagpipe(limit_parallel_limit,config);
在拒绝模式下,如果等待的队列也满了后,新来的调用将被队列太忙的拒绝异常,这种调用一般都对实时性要求很高,如果等待后执行后及时得到了数据,也可能不是实时的而浪费时间。
相关文章推荐
- 第六讲 流程控制语句
- Java笔试题解-流程控制(2)
- SQL流程控制语句
- MySql存储过程及MySql常用流程控制语法
- JAVA学习(四):Java流程控制语句(顺序结构、if条件语句、switch条件语句、循环语句与跳转语句)
- Java笔试题解-流程控制(1)
- Linux shell awk 流程控制语句(if,for,while,do)详细介绍
- 流程控制——顺序、分支、循环语句
- thinking in java 阅读笔记 第三章 程序控制流程
- 第四章 Shell流程控制
- Java基础-流程控制
- 基于微信硬件公众平台的智能控制开发流程
- thinking in java 阅读笔记 第三章 程序控制流程
- 黑马程序员-流程控制
- SQL之流程控制
- Go入门(四)-流程控制与struct
- Java基础--流程控制
- 黑马程序员——C语言——流程控制
- 第七讲 流程控制--if...else 和 switch...case
- shell脚本编程——流程控制