您的位置:首页 > Web前端 > JavaScript

RxJS入门(9)----调度(Bending Time with Schedulers)

2016-05-10 17:32 447 查看
如题,感觉题目没有翻译好,见英文知其义。

我一知道RxJS,我们开始把它用到我的项目中了。在一段时间后,我想,我知道能如何有效的使用它了,但是这里有一个令人心烦的问题:我如何知道使用的操作符是异步的还是同步的?换句话说,什么时候利用操作符准确的发送通知?这看起来是正确使用RxJs的机器重要的部分,但是这让我感觉很模糊。

interval很明显是异步的,所以它必须在像setTimeout内部使用来发射值。但是如果使用range了?它也发射异步的?它是否阻止事件循环?from了?我曾经处处使用这些操作符,但我并不知道它们内部的并发模型。

之后我就学习了Schedulers。

在你的程序中Schedulers是一个强有力的机制来恰到好处地来处理并发性。通过允许你改变它们的并发模型,你可以很细腻的掌握一个Observable是如何发射通知的。在本章中,你将学习到怎样使用Schedulers和应用到普通的场景中。我们将会聚焦测试,这里Schedulers将会尤其有用,并且你将学会你自己的Schedulers。

Using Schedulers

使用Scheduler是一种发生再将来的时间表的动作。Rxjs中的每个操作符内部可选择性地使用一个Scheduler,在最有可能的场景下提供最佳表现。

看看如何在操作符里改变Scheduler,和这样做的结果。首先我们创建一个由1000个整数的数组:

var arr = [];
for (var i=0; i<1000; i++) {
arr.push(i);
}


之后,我们根据arr创建一个Observable,并且通过订阅强迫它发射所有的通知。在下面的代码中,我们测试发射所有通知所耗费的时间:

var timeStart = Date.now();
Rx.Observable.from(arr).subscribe(
function onNext() {},
function onError() {},
function onCompleted() {
console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
});


》”Total time: 6ms”

6毫秒–不错!from内部使用 Rx.Scheduler.currentThread,它调度任务指向在当前任务执行完成之后。一旦它开始了,它同步的处理所有的通知。

现在让我们 Scheduler改为Rx.Scheduler.default:

var timeStart = Date.now();
Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe(
function onNext() {},
function onError() {},
function onCompleted() {
console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
});


》”Total time: 5337ms”

Wow,我们的代码运行比currentThread Scheduler慢了几千倍。这是由于default Scheduler异步的运行每个通知。我们可以通过在订阅之后添加一个简单的log输出来证明。

使用currentThread Scheduler:

Rx.Observable.from(arr).subscribe( ... );
console.log('Hi there!’);




“Total time: 8ms”

“Hi there!”

使用 default Scheduler:

Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... );
console.log('Hi there!’);




“Hi there!”

“Total time: 5423ms”

因为observer使用 default Scheduler异步地发射它,我们的console.log statement(它是同步的)在Observable发射任何通知前执行。使用currentThread Scheduler所有的通知都是同步的,所以console.log statement仅当Observable发射了所有的通知后才执行。

因此,Schedulers真正可以改变我们Observable工作。在这种情况下,性能受到了了异步处理的一个大的可用的数组的影响。但是我们可以使用Schedulers提高性能。例如,我们在做一些代价大的Observable的操作前,我们可以切换Scheduler来执行。

arr
.groupBy(function(value) {
return value % 2 === 0;
})
.map(function(value) {
➤ return value.observeOn(Rx.Scheduler.default);
})
.map(function(groupedObservable) {
return expensiveOperation(groupedObservable);
});


在上面的代码中,我们把数组中的值归为两大类:even和uneven的值。groupBy返回一个Observable,它发射每个group创建的Observable。同时,这里是最酷的那部分:在运行每个分组Observable的items代价大的操作前,我们使用observeOn把Scheduler变为default。这样,那些代价大的操作将会是异步地,不会阻塞事件的循环。

observeOn and subscribeOn

在先前的部分,我们在一些Observable中使用observeOn操作符来改变Scheduler,observeOn和subscribeOn是操作符实例,它返回Observable实例副本,但是使用我们传递的Scheduler作为一个参数。

observeOn需要一个Scheduler并返回一个新的Observable,它使用这个Scheduler。在新的Scheduler中它会导致每个onNext调用的执行。

subscribeOn强制一个Observable运作特定的Scheduler来订阅和取消订阅的任务(不是通知)。像observeOn一样,它接收一个Scheduler作为参数。subscribeOn很有用,例如当我们正在运行浏览器,并在subscribe调用中执行一些很重要的任务,但是我们不想终止UI线程。

Basic Rx Schedulers

让我额你更深入地看下我们刚刚使用的Schedulers。在RxJS中使用最多的就是immediate,default和currentThread。

Immediate Scheduler

immediate Scheduler 从Observable中同步的发送通知,因此,无论何时在 immediate Scheduler中一个action被调度,它将会立即执行,阻塞这个线程。Rx.Observable.range是内部使用 immediate Scheduler的操作符中的一个:

console.log('Before subscription');
Rx.Observable.range(1, 5)
.do(function(a) {
console.log('Processing value', a);
})
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });
console.log('After subscription');




Before subscription

Processing value 1

Emitted 1

Processing value 2

Emitted 4

Processing value 3

Emitted 9

Processing value 4

Emitted 16

Processing value 5

Emitted 25

After subscription

这个程序的输出如我们期望的一样。每个console.log statement运行在当前item的通知之前。

When to Use It

immediate Scheduler适用于那些那些Observable,它们的每个通知都是可预测的且操作地代价不大。同时,Observable最终必须调用onCompleted。

Default Scheduler

default Scheduler异步地执行actions。你可以粗鲁的认为是一个保持序列队形的零毫秒延迟的setTimeout,它运行在最有效的异步实现的平台上(例如,Node.js的 process.nextTick或者是浏览器里的setTimeout)。

让我们看看之前例子中的range,并让他使用default Scheduler跑起来,我们将使用observeOn操作符:

console.log('Before subscription');
Rx.Observable.range(1, 5)
.do(function(value) {
console.log('Processing value', value);
})
.observeOn(Rx.Scheduler.default)
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });
console.log('After subscription');




Before subscription

Processing value 1

Processing value 2

Processing value 3

Processing value 4

Processing value 5

After subscription

Emitted 1

Emitted 4

Emitted 9

Emitted 16

Emitted 25

在输出中有很大的不同。我们的同步console.log

statement 对每个值立即运行,但是使Observable运行在default Scheduler,它每个值的产生都是异步的。这意味着我们的log statements在do操作符里面是在平方之前处理完的。

When to Use It

default Scheduler从不阻塞事件的循环,因此,涉及到像异步的请求包含时间,这是很理想的处理方式。它可以被用在Observable用户不会完成,是因为在程序等待新通知(可能永远不会发生)的时间段内不会被阻塞。

Current Thread Scheduler

currentThread Scheduler和immediate Scheduler一样是同步的。但是,在我们的递归操作的情景中,它排队执行而不是立即执行。一个递归的操作符是一个它自己调度其他操作符的操作符。一个很好的例子就是repeat。repeat操作符,如果不给参数,一直重复之前的不定长的链中的Observable序列。

你将会遇到麻烦,如果你调用repeat在一个另外一个操作符(它使用immediate Scheduler,如return)。让我们使用这个重复10,之后使用take来取重复中的第一个值,如下,代码将会打印10一次并退出:

// Be careful: the code below will freeze your environment!
Rx.Observable.return(10).repeat().take(1)
.subscribe(function(value) {
console.log(value);
});


》Error: Too much recursion

砂锅面的代码导致了死循环。在订阅之上,return调用onNext(10)之后onCompleted,这导致repeat又重新订阅到return。由于return是运行immediate Scheduler,这个程序重复自己,引起了死循环并永远到不了take。

但是如果我们使用 currentThread Scheduler作为第二个参数到return上,如下:

var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1)
.subscribe(function(value) {
console.log(value);
});


》10

现在,当重复子订阅到return,那个新的onNext调用将会排队,是因为之前的onCompleted还在发生。repeat就返回了一个一次性的对象给take,它调用onCompleted并通过销毁repeat取消重复。最后从subscribe的调用returns。

最为一个大概的规则,currentThread应该被使用在iterate大序列上,当使用例如repeat的递归操作符时。

When to Use It

currentThread Scheduler在像repeat这样涉及递归操作的操作符是很有用的,一般包含嵌套操作符的迭代。

Scheduling for Animations

对于快速的例如canvas或DOM动画的视觉更新,我们可以使用interval在一个很小的毫秒值或者是使用Scheduler,它使用一个类似setTimeout的内部函数来调度通知。

但是两种方法都是不理想的。这两种方法中,在浏览器上我们会丢掉某些没有足够快处理的更新值。这些的发生是由于浏览器正在尝试渲染某个框架,它可能收到了渲染下一个的指令,所以它丢掉了当前的框架来保持速度。这导致了起伏的动画。在web上有好多这种情况。

浏览器有一个本地化的方式来处理动画,他们提供了一个API,使用它动用requestAnimationFrame。requestAnimationFrame允许浏览器在最合适的动画时间队列优化性能,并帮助我们获取平滑的动画。

There’s a Scheduler for That

这个RxDOM library有些补充的Schedulers,其中的一个便是requestAnimationFrame。

对的,你猜到了。使用这个Scheduler我们可以改善我们的飞船游戏。在这个游戏里面,我们粗鲁的建立了一个40毫秒的刷新——每秒25次,通过interval Observable在这个速度上,之后使用combineLatest去更新所有的游戏场景在一个通过interval的给定速度上(因为它是一个更快的更新Observable)……但是谁知道使用这个技术浏览器丢弃的次数是多少!通过使用requestAnimationFrame我们可以获取更好的性能。

让我们创建一个Observable,它使用 Rx.Scheduler.requestAnimationFrame作为它的Scheduler。注意到它与interval操作符工作的方式很相似:

function animationLoop(scheduler) {
return Rx.Observable.generate(
0,
function() { return true; }, // Keep generating forever
function(x) { return x + 1; }, // Increment internal value
function(x) { return x; }, // Value to return on each notification
Rx.Scheduler.requestAnimationFrame); // Schedule to requestAnimationFrame
}


现在,无论哪儿我们使用interval去动画的图片没秒25次,我们可以仅使用我们的animationLoop函数。所以我们Observable开始画星,它之前是:

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
return {
x: parseInt(Math.random() * canvas.width),
y: parseInt(Math.random() * canvas.height),
size: Math.random() * 3 + 1
};
})
.toArray()
.flatMap(function(arr) {
return Rx.Observable.interval(SPEED).map(function() {
return arr.map(function(star) {
if (star.y >= canvas.height) {
star.y = 0;
}
star.y += 3;
return star;
});
});
});


现在变成了:

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
return {
x: parseInt(Math.random() * canvas.width),
y: parseInt(Math.random() * canvas.height),
size: Math.random() * 3 + 1
};
})
.toArray()
.flatMap(function(arr) {
➤ return animationLoop().map(function() {
return arr.map(function(star) {
if (star.y >= canvas.height) {
star.y = 0;
}
star.y += 3;
return star;
});
});
});


这给了我们更加平滑的动画,作为红利,这个代码就更加的清晰了!

Testing with Schedulers

测试有可能使我们使用Schedulers最多的场景。这本书到目前为止我们一直在编码,没有对结果过多地考虑。但是在真正的软件项目中,我们需要编写测试以确保我们的代码向我们期望的那样运行。

测试异步代码很困难。我们通常碰到一下问题:

模拟异步事件很复杂且容易出错。测试的所有点就是为了避免bugs和错误,但是如果你测试本身有错误,他们将起不到帮助。

如果我们想基于时间的准确的功能性测试,自动化测试变得真的很慢。例如,如果我们需要精确测试一个错误,它是获取远程文件4秒后引起的,每个测试都将花费如此多的时间来运行。如果我们一直运行测试,这将会影响我们的开发时间。

The TestScheduler

Rxjs提供了TestScheduler,一个为给测试提供帮助而设计的Scheduler。TestScheduler允许我们在合适模仿时间段并创建精确的测试,这里这些测试是100%可重复的。包含这些,它允许我们把相当数量的时间压缩到一瞬间来执行,与此同时维持测试的精确性。

TestScheduler是VirtualTimeScheduler的一个特例。VirtualTimeScheduler在虚拟的时间执行action而不真实的时间。Scheduled action在队列中并分配到虚拟时间的某个时刻。当它的时钟前进时,Scheduler按顺序执行action。由于是虚拟的时间,每件事都可以立即被执行,而不用等待指定的时间。看看如下代码:

var onNext = Rx.ReactiveTest.onNext;
QUnit.test("Test value order", function(assert) {
var scheduler = new Rx.TestScheduler();
var subject = scheduler.createColdObservable(
onNext(100, 'first'),
onNext(200, 'second'),
onNext(300, 'third')
);
var result = '';
subject.subscribe(function(value) { result = value });
scheduler.advanceBy(100);
assert.equal(result, 'first');
scheduler.advanceBy(100);
assert.equal(result, 'second');
scheduler.advanceBy(100);
assert.equal(result, 'third');
});


在上面的代码中,我们测试了cold Observable按照正确顺序到达的某些值。为此,我们使用了TestScheduler中的createColdObservable的辅助方法来创建Observable,它在放回我们作为onNext参数传递的通知。在每个通知里面我们指定了通知将要发射的时间值。在这之后,我们订阅这个Observable,在createColdObservable中手工的让虚拟时间前进,检查它确定发射了那个期望的值。如果这个例子在正常的时间里运行,它将需要300ms,但是由于我们使用了TestScheduler来运行Observable,它将会立即执行,并遵循顺序。

Writing a Real-World Test

-比起写一个真实世界的时间有关的任务的测试, 没有更好的办法来领悟如何使用虚拟时间来bend时间了。让我们回顾下地震例子中Buffering values的Observable:

quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
var fragment = document.createDocumentFragment();
rows.forEach(function(row) {
fragment.appendChild(row);
});
return fragment;
})
.subscribe(function(fragment) {
table.appendChild(fragment);
});


为了使这段代码更加可测试,让我们用需要一个Scheduler参数的函数来封装这个Observable,在其中将使用bufferWithTime操作符。这是一个很好的方法来确定将要测试的Observables的Schedulers的参数。

function quakeBatches(scheduler) {
return quakes.pluck('properties')
.bufferWithTime(500, null, scheduler || null)
.filter(function(rows) {
return rows.length > 0;
});
}


让我保留本质但是简单的按步分解下下面的代码。这些代码需要一个json对象的Observable,它包含一个properties属性,缓存并每500毫秒的批次释放,并过滤过来的空批次。

我们想验证代码的有效性,每次运行测试确保我们的缓存工作如期望的一样,但我们不想都等若干秒。这正是虚拟时间和TestScheduler将会帮助我们的地方:

❶ var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var subscribe = Rx.ReactiveTest.subscribe;
❷ var scheduler = new Rx.TestScheduler();
❸ var quakes = scheduler.createHotObservable(
onNext(100, { properties: 1 }),
onNext(300, { properties: 2 }),
onNext(550, { properties: 3 }),
onNext(750, { properties: 4 }),
onNext(1000, { properties: 5 }),
onCompleted(1100)
);
❹ QUnit.test("Test quake buffering", function(assert) {
❺ var results = scheduler.startScheduler(function() {
return quakeBatches(scheduler)
}, {
created: 0,
subscribed: 0,
disposed: 1200
});
❻ var messages = results.messages;
console.log(results.scheduler === scheduler);
❼ assert.equal(
messages[0].toString(),
onNext(501, [1, 2]).toString()
);
assert.equal(
messages[1].toString(),
onNext(1001, [3, 4, 5]).toString()
);
assert.equal(
messages[2].toString(),
onCompleted(1100).toString()
);
});


让我们按步骤分析下:

1:我们从ReactiveTest中获取一些帮助函数开始。这是一些在虚拟时间里的注册事件:onNext,onCompleted,subscribe。

2:我们创建了一个新的TestScheduler,它将驱动所有的测试。

3:使用TestScheduler的createHotObservable方法,我们创建了一个假的Observable,它将会在虚拟时间的特定时刻模拟通知。尤其是,它将会在第一个秒内发射5个通知,1100毫秒完成。每次它发射一个特定properties属性的对象。

4:我们使用任何的测试框架来测试。例如,我选择了QUnit。

5:我们使用startScheduler方法创建一个Observable,它使用了一个 test Scheduler。第一个参数是一个函数,它创建了运行了我们SchedulerObservable。这种情况下,我们简单返回我们的quakeBatches函数,我们传递TestScheduler给它。第二个参数是一个包含了不同虚拟时间的对象,这是我们想创建Observable,订阅到它,并销毁它。对我们的例子来说,我们订阅从虚拟0时间开始并且我们销毁这个Observable在虚拟的1200毫秒之后。

6:startScheduler返回一个scheduler和message属性的对象。在message中我们可以Observable在虚拟时间发射的所有通知。

7:501毫秒(仅仅在第一个buffer time限制后)后第一个明确测试Observable产生了值1和2

我们的第二个明确的测试在1001毫秒后,Observable蚕丝了持续的值3,4,5。最后,我们的第三个明确的测试检查了,当我们指定了热Observable quakes,1100毫秒后这个序列是完全完成了。

上面的代码是一个可靠的方法,它高效的测试了我们的异步Observable,没有必须跳转到模仿异步条件的大圈子。我们简单地只当了时间,这是我们希望代码在虚拟时间里的响应,同时,我们使用一个 test Scheduler来运行所有的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: