您的位置:首页 > Web前端 > JavaScript

Angular2之RxJS源码解读

2018-02-26 15:08 330 查看

基本概念

Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions

RxJS是一个库,它通过使用observable序列来编写异步和基于事件的程序。它提供了一个核心类型Observable,附属类型(Observer、Schedulers、Subjects)和受[Array#extras]启发的操作符(map、filter、reduce、every等),这些数组操作符可以把异步事件作为集合来处理。

在rxjs中用来解决异步事件管理的基本概念是:

Observable(可观察对象):表示一个概念,这个概念是一个可调用的未来值或事件的集合;

Oserver(观察者):一个回调函数的集合,它知道如何去监听有Observable提供的值;

Subscription(订阅):表示Observable的执行,主要用于取消Observable的执行;

Operators(操作符):采用函数式编程风格的春函数,使用像map、filter、concat、flatMap等这样的操作符来处理集合;

Subject(主体):相当于EventEmitter,并且是将值或事件多路推送给Observer的唯一方式;

Schedulers(调度器):用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如setTimeout或requestAnimationFrame或其他;

Observable可观察对象

Observable是多个值的推送集合。怎么理解呢,来看代码:

console.log(Observable);

ƒ Observable(subscribe) {
this._isScalar = false;
if (subscribe) {
this._subscribe = subscribe;
}
}


解释:Observable是一个构造函数

既然是一个构造函数,那么我们来实例化

//不带参数
const observable1=new Observable();
console.log(observable1)

//结果
Observable {_isScalar: false}
_isScalar:false
__proto__:Object

//带参数
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

const observable2 = new Observable(() => { ... });
console.log('observable2', observable2)

Observable {_isScalar: false, _subscribe: ƒ}
_isScalar:false
_subscribe:ƒ ()
__proto__:Object


observer观察者对象

观察者是一个拥有三个回调函数的对象

var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};


当然,只有next函数也是可以的。实例化Observable的时候传一个观察者对象进去,如果单单只是传入一个函数,而并没有将其附加到观察者对象上

例如

observable.subscribe(x => console.log('Observer got a next value: ' + x));


在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。

Subscription订阅

subscription是表示可清理的资源,通常是Observablle的执行,该对象有一个很重要的方法,即unsubscribe即取消订阅,不需要任何函数。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();


同时取消多个订阅

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription.unsubscribe();
}, 1000);


Subscriptions 还有一个 remove(otherSubscription) 方法,用来撤销一个已添加的子 Subscription 。

Subject(主体)

subject是一种特殊类型的observable,它允许将值传播给多个观察者,它允许将值传给多个观察者,所以subject是多播的,而普通的observable是单播的(每个已订阅的观察者都拥有observable的独立执行)

subject既是observable又是observer。

subject作为observable时:

const ob1 = {
next: (data) => {
console.log('obj1****', data);
}
};
const subject1 = new Subject();
subject1.subscribe(ob1);     //作为observable

subject1.next(1);//输出obj1***,1       这时候是作为观察者


原理

Subject.prototype.next = function (value) {
if (this.closed) {
throw new ObjectUnsubscribedError_1.ObjectUnsubscribedError();
}
if (!this.isStopped) {
var observers = this.observers;
var len = observers.length;
var copy = observers.slice();//将观察者列表赋给copy,并且,当copy发生改变    原始的观察者列表不变
for (var i = 0; i < len; i++) {
copy[i].next(value);
}
}
};

subject的next本质上调用的是观察者的next函数。


未完待续….
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: