您的位置:首页 > Web前端 > Node.js

nodejs之异步流程控制ASYNC

2018-01-03 23:03 816 查看


参考http://blog.csdn.net/ctbinzi/article/details/39895401async主要实现了三个部分的流程控制功能:
集合: Collections
流程控制: Control Flow
工具类: Utils
1). 集合: Collectionseach: 如果想对同一个集合中的所有元素都执行同一个异步操作。
map: 对集合中的每一个元素,执行某个异步操作,得到结果。所有的结果将汇总到最终的callback里。与each的区别是,each只关心操作不管最后的值,而map关心的最后产生的值。
filter: 使用异步操作对集合中的元素进行筛选, 需要注意的是,iterator的callback只有一个参数,只能接收true或false。
reject: reject跟filter正好相反,当测试为true时则抛弃
reduce: 可以让我们给定一个初始值,用它与集合中的每一个元素做运算,最后得到一个值。reduce从左向右来遍历元素,如果想从右向左,可使用reduceRight。
detect: 用于取得集合中满足条件的第一个元素。
sortBy: 对集合内的元素进行排序,依据每个元素进行某异步操作后产生的值,从小到大排序。
some: 当集合中是否有至少一个元素满足条件时,最终callback得到的值为true,否则为false.
every: 如果集合里每一个元素都满足条件,则传给最终回调的result为true,否则为false
concat: 将多个异步操作的结果合并为一个数组。
2). 流程控制: Control Flowseries: 串行执行,一个函数数组中的每个函数,每一个函数执行完成之后才能执行下一个函数。
parallel: 并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。
whilst: 相当于while,但其中的异步调用将在完成后才会进行下一次循环。
doWhilst: 相当于do…while, doWhilst交换了fn,test的参数位置,先执行一次循环,再做test判断。
until: until与whilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
doUntil: doUntil与doWhilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
forever: 无论条件循环执行,如果不出错,callback永远不被执行。
waterfall: 按顺序依次执行一组函数。每个函数产生的值,都将传给下一个。
compose: 创建一个包括一组异步函数的函数集合,每个函数会消费上一次函数的返回值。把f(),g(),h()异步函数,组合成f(g(h()))的形式,通过callback得到返回值。
applyEach: 实现给一数组中每个函数传相同参数,通过callback返回。如果只传第一个参数,将返回一个函数对象,我可以传参调用。
queue: 是一个串行的消息队列,通过限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用。
cargo: 一个串行的消息队列,类似于queue,通过限制了worker数量,不再一次性全部执行。不同之处在于,cargo每次会加载满额的任务做为任务单元,只有任务单元中全部执行完成后,才会加载新的任务单元。
auto: 用来处理有依赖关系的多个任务的执行。
iterator: 将一组函数包装成为一个iterator,初次调用此iterator时,会执行定义中的第一个函数并返回第二个函数以供调用。
apply: 可以让我们给一个函数预绑定多个参数并生成一个可直接调用的新函数,简化代码。
nextTick: 与nodejs的nextTick一样,再最后调用函数。
times: 异步运行,times可以指定调用几次,并把结果合并到数组中返回
timesSeries: 与time类似,唯一不同的是同步执行
3). 工具类: Utilsmemoize: 让某一个函数在内存中缓存它的计算结果。对于相同的参数,只计算一次,下次就直接拿到之前算好的结果。
unmemoize: 让已经被缓存的函数,返回不缓存的函数引用。
log: 执行某异步函数,并记录它的返回值,日志输出。
dir: 与log类似,不同之处在于,会调用浏览器的console.dir()函数,显示为DOM视图。
noConflict: 如果之前已经在全局域中定义了async变量,当导入本async.js时,会先把之前的async变量保存起来,然后覆盖它。仅仅用于浏览器端,在nodejs中没用

========异步流程控制链接mysql:参考---http://www.cnblogs.com/fangsmile/p/6257450.html
安装async------

 npm install async -g

=======================串行无关联
 function exec() {
async.series(
{
one:function(done){
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,'one');//如果第一个不是null,表示出错,就不会执行下面的程序了,直接执行最后的function(err,rs){};如果这个done不执行,则two函数就不会执行
}

},1000);
},
two:function(done){
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,'two');//如果第一个不是null,表示出错,就不会执行two了,直接执行最后的function(err,rs){};如果这个done不执行,则two函数就不会执行
}

},1000);
}
},function(err,rs){console.log(err);
console.log(rs);
})
}
exec();

======================并行无关联

function exec() {
async.parallel(
{
one:function(done){
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,'one');//如果第一个不是null,表示出错,就不会执行two中的回调(done方法)了,但是two方法还是会执行,直接执行最后的function(err,rs){};如果这个done不执行,则two函数就不会执行
}

},1000);
},
two:function(done){
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,'two');
}

},1000);
}
},function(err,rs){console.log(err);
console.log(rs);
})
}
exec();

======================串行有关联(瀑布流)

function exec() {
async.waterfall(
[
function(done){
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,'one');//如果第一个不是null,表示出错,就不会执行two中的回调(done方法)了,但是two方法还是会执行,直接执行最后的function(err,rs){};如果这个done不执行,则two函数就不会执行
}

},1000);
},
function(preValue,done){//preVlaue是上一个方法执行完的回调的结果:one
i=0;
setInterval(function() {//每隔一秒执行一次
i++;
console.log('twotwo');
if(i==3){
clearInterval(this);
done(null,preValue+'two');
}

},1000);
}
],function(err,rs){console.log(err);
console.log(rs);
})
}
exec();
===

1. series(tasks, [callback]) (多个函数依次执行,之间没有数据交换)
有多个异步函数需要依次调用,一个完成之后才能执行下一个。各函数之间没有数据的交换,仅仅需要保证其执行顺序。这时可使用series。
var async = require(‘async’)
async.series([
function(cb) { step1(function(err,v1) {
// do something with v1
cb(err, v1);
}),
function(cb) { step2(...) },
function(cb) { step3(...) }
], function(err, values) {
// do somethig with the err or values v1/v2/v3
});
该函数的详细解释为:

依次执行一个函数数组中的每个函数,每一个函数执行完成之后才能执行下一个函数。
如果任何一个函数向它的回调函数中传了一个error,则后面的函数都不会被执行,并且将会立刻会将该error以及已经执行了的函数的结果,传给series中最后那个callback。
当所有的函数执行完后(没有出错),则会把每个函数传给其回调函数的结果合并为一个数组,传给series最后的那个callback。
还可以json的形式来提供tasks。每一个属性都会被当作函数来执行,并且结果也会以json形式传给series最后的那个callback。这种方式可读性更高一些。
需要注意的是:多个series调用之间是不分先后的,因为series本身也是异步调用。
2. parallel(tasks, [callback]) (多个函数并行执行)
并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。

如果某个函数出错,则立刻将err和已经执行完的函数的结果值传给parallel最终的callback。其它未执行完的函数的值不会传到最终数据,但要占个位置。

同时支持json形式的tasks,其最终callback的结果也为json形式。

示例代码:

async.parallel([
function(cb) { t.fire('a400', cb, 400) },
function(cb) { t.fire('a200', cb, 200) },
function(cb) { t.fire('a300', cb, 300) }
], function (err, results) {
log(’1.1 err: ‘, err); // -> undefined
log(’1.1 results: ‘, results); // ->[ 'a400', 'a200', 'a300' ]
});

中途出错的示例:

async.parallel([
function(cb) { log('1.2.1: ', 'start'); t.fire('a400', cb, 400) }, // 该函数的值不会传给最终callback,但要占个位置
function(cb) { log('1.2.2: ', 'start'); t.err('e200', cb, 200) },
function(cb) { log('1.2.3: ', 'start'); t.fire('a100', cb, 100) }
], function(err, results) {
log(’1.2 err: ‘, err); // -> e200
log(’1.2 results: ‘, results); // -> [ , undefined, 'a100' ]
});

以json形式传入tasks

async.parallel({
a: function(cb) { t.fire(‘a400′, cb, 400) },
b: function(cb) { t.fire(‘c300′, cb, 300) }
}, function(err, results) {
log(’1.3 err: ‘, err); // -> undefined
log(’1.3 results: ‘, results); // -> { b: ‘c300′, a: ‘a400′ }
});
3. waterfall(tasks, [callback]) (多个函数依次执行,且前一个的输出为后一个的输入)
与seires相似,按顺序依次执行多个函数。不同之处,每一个函数产生的值,都将传给下一个函数。如果中途出错,后面的函数将不会被执行。错误信息以及之前产生的结果,将传给waterfall最终的callback。

async.waterfall([
function(cb) { log('1.1.1: ', 'start'); cb(null, 3); },
function(n, cb) { log('1.1.2: ',n); t.inc(n, cb); },
function(n, cb) { log('1.1.3: ',n); t.fire(n*n, cb); }
], function (err, result) {
log(’1.1 err: ‘, err); // -> null
log(’1.1 result: ‘, result); // -> 16
});
4. auto(tasks, [callback]) (多个函数有依赖关系,有的并行执行,有的依次执行)
用来处理有依赖关系的多个任务的执行。比如某些任务之间彼此独立,可以并行执行;但某些任务依赖于其它某些任务,只能等那些任务完成后才能执行。

虽然我们可以使用async.parallel和async.series结合起来实现该功能,但如果任务之间关系复杂,则代码会相当复杂,以后如果想添加一个新任务,也会很麻烦。这时使用async.auto,则会事半功倍。

如果有任务中途出错,则会把该错误传给最终callback,所有任务(包括已经执行完的)产生的数据将被忽略。

这里假设我要写一个程序,它要完成以下几件事:

从某处取得数据
在硬盘上建立一个新的目录
将数据写入到目录下某文件
发送邮件,将文件以附件形式发送给其它人。
分析该任务,可以知道1与2可以并行执行,3需要等1和2完成,4要等3完成。

async.auto({
getData: function (callback) {
setTimeout(function(){
console.log(’1.1: got data’);
callback();
}, 300);
},
makeFolder: function (callback) {
setTimeout(function(){
console.log(’1.1: made folder’);
callback();
}, 200);
},
writeFile: ['getData', 'makeFolder', function(callback) {
setTimeout(function(){
console.log('1.1: wrote file');
callback(null, 'myfile');
}, 300);
}],
emailFiles: ['writeFile', function(callback, results) {
log('1.1: emailed file: ', results.writeFile); // -> myfile
callback(null, results.writeFile);
}]
}, function(err, results) {
log(’1.1: err: ‘, err); // -> null
log(’1.1: results: ‘, results); // -> { makeFolder: undefined,
// getData: undefined,
// writeFile: ‘myfile’,
// emailFiles: ‘myfile’ }
});
5. whilst(test, fn, callback)(用可于异步调用的while)
相当于while,但其中的异步调用将在完成后才会进行下一次循环。举例如下:

var count1 = 0;
async.whilst(
function() { return count1 < 3 },
function(cb) {
log(’1.1 count: ‘, count1);
count1++;
setTimeout(cb, 1000);
},
function(err) {
// 3s have passed
log(’1.1 err: ‘, err); // -> undefined
}
);
它相当于:

try {
whilst(test) {
fn();
}
callback();
} catch (err) {
callback(err);
}
该函数的功能比较简单,条件变量通常定义在外面,可供每个函数访问。在循环中,异步调用时产生的值实际上被丢弃了,因为最后那个callback只能传入错误信息。

另外,第二个函数fn需要能接受一个函数cb,这个cb最终必须被执行,用于表示出错或正常结束。
6. until(test, fn, callback) (与while相似,但判断条件相反)
var count4 = 0;
async.until(
function() { return count4>3 },
function(cb) {
log(’1.4 count: ‘, count4);
count4++;
setTimeout(cb, 200);
},
function(err) {
// 4s have passed
log(’1.4 err: ‘,err); // -> undefined
}
);
当第一个函数条件为false时,继续执行第二个函数,否则跳出。
7. queue (可设定worker数量的队列)
queue相当于一个加强版的parallel,主要是限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用。

该函数有多个点可供回调,如worker用完时、无等候任务时、全部执行完时等。

定义一个queue,其worker数量为2,并在任务执行时,记录一下日志:

var q = async.queue(function(task, callback) {
log(‘worker is processing task: ‘, task.name);
task.run(callback);
}, 2);
worker数量将用完时,会调用saturated函数:

q.saturated = function() {
log(‘all workers to be used’);
}
当最后一个任务交给worker执行时,会调用empty函数

q.empty = function() {
log(‘no more tasks wating’);
}
当所有任务都执行完时,会调用drain函数

q.drain = function() {
console.log(‘all tasks have been processed’);
}
放入多个任务,可一次放一个,或一次放多个

q.push({name:’t1′, run: function(cb){
log(‘t1 is running, waiting tasks: ‘, q.length());
t.fire(‘t1′, cb, 400); // 400ms后执行
}}, function(err) {
log(‘t1 executed’);
});

q.push([{name:'t3', run: function(cb){
log('t3 is running, waiting tasks: ', q.length());
t.fire('t3', cb, 300); // 300ms后执行
}},{name:'t4', run: function(cb){
log('t4 is running, waiting tasks: ', q.length());
t.fire('t4', cb, 500); // 500ms后执行
}}], function(err) {
log(‘t3/4 executed’);
});

8. iterator(tasks) (将几个函数包装为iterator)
将一组函数包装成为一个iterator,可通过next()得到以下一个函数为起点的新的iterator。该函数通常由async在内部使用,但如果需要时,也可在我们的代码中使用它。

var iter = async.iterator([
function() { console.log('111') },
function() { console.log('222') },
function() { console.log('333') }
]);
console.log(iter());
console.log(iter.next());
直接调用(),会执行当前函数,并返回一个由下个函数为起点的新的iterator。调用next(),不会执行当前函数,直接返回由下个函数为起点的新iterator。

对于同一个iterator,多次调用next(),不会影响自己。如果只剩下一个元素,调用next()会返回null。
9. apply(function, arguments..) (给函数预绑定参数)
apply是一个非常好用的函数,可以让我们给一个函数预绑定多个参数并生成一个可直接调用的新函数,简化代码。

对于函数:

function(callback) { t.inc(3, callback); }
可以用apply改写为:

async.apply(t.inc, 3);
还可以给某些函数预设值,得到一个新函数:

var log = async.apply(console.log, ">");
log(‘hello’);
// > hello

10. nextTick(callback) (在nodejs与浏览器两边行为一致)
nextTick的作用与nodejs的nextTick一样,都是把某个函数调用放在队列的尾部。但在浏览器端,只能使用setTimeout(callback,0),但这个方法有时候会让其它高优先级的任务插到前面去。

所以提供了这个nextTick,让同样的代码在服务器端和浏览器端表现一致。

var calls = [];
async.nextTick(function() {
calls.push(‘two’);
});
calls.push(‘one’);
async.nextTick(function() {
console.log(calls); // -> [ 'one', 'two' ]
});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: