您的位置:首页 > Web前端 > Node.js

node异步编程流程控制库

2017-09-11 23:52 399 查看

(一)尾触发和next()

尾触目前应用最多的地方就是connect中间件

connect中间件api暴露方式

var app = connect();
// Middleware
app.use(connect.staticCache());
app.use(connect.static(__dirname + '/public'));
app.use(connect.cookieParser());
app.use(connect.session());
app.use(connect.query());
app.use(connect.bodyParser());
app.use(connect.csrf());
app.listen(3001);


在通过use()注册好一系列中间件后,监听端口上的请求;中间件利用了尾触的机制,最简单的中间件如下:

function (req, res, next) {
// 中间件
}


每一个中间件携带请求对象req,响应对象res ,尾触函数 通过队列形成一个处理流。



中间件在处理网络请求时候,可以像面向切面编程一样进行过滤,验证,日志处理 而不与具体业务逻辑产生关联,以此实现解耦。

connect核心实现

function createServer() {
function app(req, res){ app.handle(req, res); }
utils.merge(app, proto);
utils.merge(app, EventEmitter.prototype);
app.route = '/';
app.stack = [];
for (var i = 0; i < arguments.length; ++i) {
app.use(arguments[i]);
}
return app;
};
这段代码通过如下代码创建HTTP服务器的request事件处理函数
function app(req, res){ app.handle(req, res); }
但是真正的核心代码是app.stack=[]; stack属性是这个服务器内部维护的中间件队列,通过use()方法我们可以将中间件放进队列,下面是use()方法重要的部分:
app.use = function(route, fn){
// some code
this.stack.push({ route: route, handle: fn });
return this;
};

此时就建好处理模型了,接下来 结合node原生http模块实现监听即可,监听函数的实现如下:
app.listen=function(){
var server=http.createServer(this);
return server.listen.apply(server,arguments);
}
简化后的next方法如下(next的方法实现比较复杂)
function next(err) {
// some code
// next callback
layer = stack[index++];
layer.handle(req, res, next);
}
值得提醒的是:尽管中间件这种尾触机制并不要求每个中间件方法都是异步的,但是如果每个步骤都采用异步来完成,实际上只是串行化处理,没办法通过并行的异步调用来提升业务的处理效率;流式处理可以将一些串行的逻辑扁平化,但是并行逻辑处理还需要搭配事件或者promise来完成,这样业务在纵向和横向都能各自清晰


(二)async模块

async模块提供了多达20种方法来处理异步流程:

series() 方法支持没有依赖的异步串行

async.series([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function (err, results) {
// results => [file1.txt, file2.txt]
});


异常处理:一旦发生异常立即结束所有调用。

2. parallel() 异步并行

async.parallel([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function (err, results) {
// results => [file1.txt, file2.txt]
});


一旦发生异常,就会将异常作为第一个参数传递给最终的回调函数,只有数组中所有函数执行完毕后才会输出最终的result 顺序按照数组中添加函数的顺序。

回顾下EventProxy的方案:

var EventProxy = require('eventproxy');
var proxy = new EventProxy();
proxy.all('content', 'data', function (content, data) {
callback(null, [content, data]);
})
proxy.fail(callback);
fs.readFile('file1.txt', 'utf-8', proxy.done('content'));
fs.readFile('file2.txt', 'utf-8', proxy.done('data'));


这两种实现方式都是通过高阶函数来实现,

waterfall() 处理有依赖的异步串行

async.waterfall([
function (callback) {
fs.readFile('file1.txt', 'utf-8', function (err, content) {
callback(err, content);
});
},
function (arg1, callback) {
// arg1 => file2.txt
fs.readFile(arg1, 'utf-8', function (err, content) {
callback(err, content);
});
},
function(arg1, callback){
// arg1 => file3.txt
fs.readFile(arg1, 'utf-8', function (err, content) {
callback(err, content);
});
}
], function (err, result) {
// result => file4.txt
});


4 auto() 自动以最佳的方式处理有依赖的异步

业务场景

1 从磁盘读取配置文件

2 根据配置文件链接MongoDB

3 根据配置文件连接Redis

4 编译静态文件

5 上传静态文件到CDN

6 启动服务器

业务逻辑:
{
readConfig: function () {},
connectMongoDB: function () {},
connectRedis: function () {},
complieAsserts: function () {},
uploadAsserts: function () {},
startup: function () {}
}

var deps = {
readConfig: function (callback) {
// read config file
callback();
},
connectMongoDB: ['readConfig', function (callback) {
// connect to mongodb
callback();
}],
connectRedis: ['readConfig', function (callback) {
// connect to redis
callback();
}],
complieAsserts: function (callback) {
// complie asserts
callback();
},
uploadAsserts: ['complieAsserts', function (callback) {
// upload to assert
callback();
}],
startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function (callback) {
// startup
}]
};
async.auto(deps);
如果用EventProx来实现则需要更细腻的事件分配


另外async还提供了数组中的方法:map(),forEach(),async模块也支持异步并发控制,详见parallelLimit()方法

async.parallelLimit([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], 1, function (err, results) {
// TODO
});


parallelLimit方法的缺陷是无法动态的增加要执行的方法,只能事先指定。

为此async提供了queue()方法来实现动态改变执行队列:这对于遍历文件系统时很有用

var q = async.queue(function (file, callback) {
fs.readFile(file, 'utf-8', callback);
}, 2);
q.drain = function () {
// 完成队列中的所有任务时触发
};
fs.readdirSync('.').forEach(function (file) {
q.push(file, function (err, data) {
// TODO
});
});


但是queue方法接受的参数是固定的,又失去了parallelLimit的灵活性

bagpipe更好的控制异步并发

解决思路:

1 通过一个队里来控制并发量

2 如果当前活跃(调用发起但未执行回调)的异步调用量小于限定值,从队列中取出执行

3 每个异步调用结束时,从队列中取出新的异步调用来执行

var Bagpipe = require('bagpipe');
// 设置并发数为10
var bagpipe = new Bagpipe(10);
for (var i = 0; i < 100; i++) {
bagpipe.push(async, function () {
// 异步回调执行
});
}
bagpipe.on('full', function (length) {
console.warn('系统队列不能完全处理,当前阻塞长度为' + length);
});


push()方法的核心实现如下:

Bagpipe.prototype.push = function (method) {
var args = [].slice.call(arguments, 1);
var callback = args[args.length - 1];
if (typeof callback !== 'function') {
args.push(function () {});
}
if (this.options.disabled || this.limit < 1) {
method.apply(null, args);
return this;
}
// 队列长度也超过限制时
if (this.queue.length < this.queueLength || !this.options.refuse) {
this.queue.push({
method: method,
args: args
});
} else {
var err = new Error('Too much async call in queue');
err.name = 'TooMuchAsyncCallError';
callback(err);
}
if (this.queue.length > 1) {
this.emit('full', this.queue.length);
}
this.next();
return this;
};


将调用推入队列后,调用一次next()方法尝试触发,其方法定义如下

Bagpipe.prototype.next = function () {
var that = this;
if (that.active < that.limit && that.queue.length) {
var req = that.queue.shift();
that.run(req.method, req.args);
}
};


执行队列中的方法:

Bagpipe.prototype.run = function (method, args) {
var that = this;
that.active++;
var callback = args[args.length - 1];
var timer = null;
var called = false;
// inject logic
args[args.length - 1] = function (err) {
// anyway, clear the timer
if (timer) {
clearTimeout(timer);
timer = null;
}
// if timeout, don't execute
if (!called) {
that._next();
callback.apply(null, arguments);
} else {
// pass the outdated error
if (err) {
that.emit('outdated', err);
}
}
};
var timeout = that.options.timeout;
if (timeout) {
timer = setTimeout(function () {
// set called as true
called = true;
that._next();
// pass the exception
var err = new Error(timeout + 'ms timeout');
err.name = 'BagpipeTimeoutError';
err.data = {
name: method.name,
method: method.toString(),
args: args.slice(0, -1)
};
callback(err);
}, timeout);
}
method.apply(null, args);
};


用户传入的回调函数被真正执行前,被替换封装过,这个封装的回调函数内部的逻辑活跃值得计数器减1后,主动调用next执行后续等待的异步调用。

bagpipe还可以实现超时控制,和拒绝模式

var limit_parallel_limit=10;
var config={
timeout:3000.
refuse:true
};
var bagpipe=new Bagpipe(limit_parallel_limit,config);


在拒绝模式下,如果等待的队列也满了后,新来的调用将被队列太忙的拒绝异常,这种调用一般都对实时性要求很高,如果等待后执行后及时得到了数据,也可能不是实时的而浪费时间。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  编程 异步 node-js