您的位置:首页 > 编程语言 > Java开发

快速理解RxJava源码的设计理念

2016-05-04 10:04 399 查看

前言

我在看过几篇关于RxJava源码分析的博客后,不知是我的水平有限还是源码过于博大精深,导致花了很长的时间才搞清楚其运行原理。我个人觉得应该有更好的办法来快速剖析理解,于是决定写下本文。

本文适合已经看过一些RxJava源码分析资料的同学,不过没看过也没关系。在看本文时可参考这篇博客:RxJava基本流程和lift源码分析,它说得比较全,在此感谢博主大头鬼Bruce。

一、初探RxJava

【以下摘录了RxJava基本流程和lift源码分析

我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};

Observable.create(onSubscriber1)
.subscribe(subscriber1);


首先我们来看一下Observable.create的代码:

public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}


直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。

同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}


可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);


传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。

这样就实现了观察者和被观察者之间的通讯。

【摘录结束】

好了,总结一下:

订阅类Subscriber的职责是处理数据

入门结束,下面看看带操作符的情况。

二、带操作符的RxJava应用

【以下摘录了RxJava基本流程和lift源码分析

先展示Demo:

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
};
Subscriber<Integer> subscriber1 = new Subscriber<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer i) {

}
};
Func1<String, Integer> transformer1 = new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
};

Observable.create(onSubscriber1)
.map(transformer1)
.subscribe(subscriber1);


和刚才不同的是我们在create之后调用了map方法,然后才调用subscribe方法。

【摘录结束】

再贴上RxJava的执行流程图:



我先根据这张图给出RxJava能这样运行的3条准则:

从create函数能向下执行到subscribe函数,靠的是返回Observable类的对象;

从subscribe函数能再向上执行到onSubscriber1,靠的是下面的Observable对象能够调用上面Observable对象的onSubscribe属性;

从上面能再次往下依次对数据进行处理,靠的是Subscriber责任链。

为什么我不先分析源码而先给准则呢?我发现如果在不知道准则(或者称作设计思路)情况下看源码就会“见树不见林”,而知道了准则就把握了框架脉络,剩下的只是到源码中找答案而已。既然本文标榜了“快速理解RxJava源码的设计理念”,那当然不能徒有虚名。下面我们到源码中找答案。

首先我们来证明准则1:

map方法的代码如下:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}


map返回的是Observable,证明完毕。快吧,就这么快。在此将map返回的Observable对象标记为observable2。

再证明准则2:

对照demo中的代码,我们调用map之后,就调用了subscribe方法,也就是调用了这里的observable2的subscribe方法。

根据上面的介绍,调用subscribe之后,就会调用observable2.onSubscribe.call方法【代码1】:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}


其中,call函数的第一句与准则3的证明有关,暂时不看。第二句也不看(不好意思,我没看懂这句干什么的,不过不影响继续分析),我们看第三句:

onSubscribe.call(st);


这句很关键,准则2的证明靠他了。我们要搞明白onSubscribe是谁的属性。我将demo换个方式写出来:

...
Observable.create(onSubscriber1);
observable1.map(transformer1);
observable2.subscribe(subscriber1);


map函数是observable1的,那么lift函数也是observable1的,按Java语法规定内部类可以调用外部类的属性,所以onSubscribe是observable1的。这样就实现了observable2跨入到observable1的目的。证毕。为更好地理解,我把observable2.onSubscribe.call方法【代码1】再用括号注释下:

public final <R> Observable<R> (Observable1的)lift(final Operator<? extends R, ? super T> operator) {
return new (返回了Observable2)Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
(Observable1的)onSubscribe.call(st);
}
});
}


通俗地说,因为observable2是在observable1中(lift函数中)生成的,所以可以利用Java的内部类特性调用外部对象(observable1)的onSubscribe。

最后证明准则3:

说实话,写到这里我感觉还是有部分同学没明白怎么回事。不过没关系,我将会继续围绕讲述。

我们再来看看observable2.onSubscribe.call方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}


前面说了,call函数的第一句与准则3的证明有关。我们先不看它做了什么,返回值给了st,st是个Subscriber对象。我们在第一节总结说了,处理数据是订阅类Subscriber的职责,因此st有处理数据的职责。再看第三句,将st给了observable1。问题来了,准则3想要运行,就必须有Subscriber责任链。“链”在哪里?不用猜了,我直接告诉你答案:往下“链”的代码在:

Subscriber<? super T> st = hook.onLift(operator).call(o);


我们看看OperatorMap的call方法:

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onNext(T t) {
o.onNext(transformer.call(t));
}
};
}


入口参数o在Demo里就是subscriber1对象。我们看它的这句:

o.onNext(transformer.call(t));


按照语法规定,是先执行括号里的。我把它写得再直白一点:

临时数据=map(处理前的数据);//本步骤处理
subscriber1.onNext(临时数据);//提交给下一步处理


往上“链”的代码在:

onSubscribe.call(st);


总结一下:

在observable2这种操作符返回的被观察者中,OnSubscribe要做两件事:

在本级中生成订阅类Subscriber,在Subscriber中将本级的操作(map)与下一级操作连接起来(向下链接)

将本级中生成订阅类Subscriber交给上一级Observable,方便上一级向下链接

因此,在此的订阅类Subscriber的职责实际上有两个:

处理数据

连接下一个步骤的Subscriber

再上一张图帮助消化:



后记:RxJava为什么这么搞

相信你对lift流程图挺有印象:



我看到后第一个感觉就是:有必要搞得这么复杂吗?幸好还记得设计模式中的装饰者模式和责任链模式,还能够大体上理解。不过老问题依然存在:它这么搞有必要吗?学习一个陌生的事物最好的办法就是拿一个熟悉的事物进行对比,回想一下软件在运行前会进行什么工作?答案是配置。在代码

Observable.create(onSubscriber1)
.map(transformer1)
.subscribe(subscriber1);


create也好,map也罢,都是配置。再问个问题,软件配置好了怎么运行?要么是点击软件,要么是命令启动。subscribe就是启动。好了,我们可以进行抽象了。无论是RxJava还是软件,其核心就是“配置+启动”。说了RxJava与软件的相同之处,它俩有什么不同吗?你看软件是先配置再启动对吧,没见过还要像”lift流程图”一样上串下跳吧。这就是RxJava的设计理念不同了。我当时在想,如果我来设计,我会先create一个Observable对象,然后将map这类的操作符放到Observable对象里的一个”操作队列”里面,这样配置就完成了。等需要启动时,调用subscribe启动”操作队列”处理数据,你看代码执行流程不是很简单吗。这种实现办法是通过数据结构达成的,优点就是源码执行流程清楚简单,缺点就是这个对象职责过多。而RxJava是通过责任链模式达成的,优点是职责分散,缺点就是源码执行起来上串下跳。不过RxJava在设计时已经为用户考虑到这点,实际上如果你不看源码只是使用RxJava,对你而言代码的执行就是一条线下来也很清晰。好了,就讲到这吧,希望对理解RxJava能有所帮助。

参考文档

RxJava基本流程和lift源码分析

RxJava源码初探
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: