RxJava学习(一)
2016-04-04 16:17
609 查看
注意:文字和图片转载自抛物线博客
参考:http://gank.io/post/560e15be2dca930e00da1083RxJava 到底是什么
一个词:异步。RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
然而,对于初学者来说,这太难看懂了。因为它是一个『总结』,而初学者更需要一个『引言』。
其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。
RxJava 好在哪
换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / ... ?』一个词:简洁。
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的
AsyncTask和
Handler,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
RxJava 的观察者模式
RxJava 有四个基本概念:
Observable(可观察者,即被观察者)、
Observer(观察者)、
subscribe(订阅)、事件。
Observable和
Observer通过
subscribe()方法实现订阅关系,从而
Observable可以在需要的时候发出事件来通知
Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件
onNext()(相当于
onClick()/
onEvent())之外,还定义了两个特殊的事件:
onCompleted()和
onError()。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的
onNext()发出时,需要触发
onCompleted()方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,
onError()会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中,
onCompleted()和
onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,
onCompleted()和
onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava 的观察者模式大致如下图:
2. 基本实现
基于以上的概念, RxJava 的基本实现主要有三点:1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。
除了
Observer接口之外,RxJava 还内置了一个实现了
Observer的抽象类:
Subscriber。
Subscriber对
Observer接口进行了一些扩展,但他们的基本使用方式是完全一样的:
不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,
Observer也总是会先被转换成一个
Subscriber再使用。所以如果你只想使用基本功能,选择
Observer和
Subscriber是完全一样的。它们的区别对于使用者来说主要有两点:
onStart(): 这是
Subscriber增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),
onStart()就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用
doOnSubscribe()方法,具体可以在后面的文中看到。
unsubscribe(): 这是
Subscriber所实现的另一个接口
Subscription的方法,用于取消订阅。在这个方法被调用后,
Subscriber将不再接收事件。一般在这个方法调用前,可以使用
isUnsubscribed()先判断一下状态。
unsubscribe()这个方法很重要,因为在
subscribe()之后,
Observable会持有
Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如
onPause()
onStop()等方法中)调用
unsubscribe()来解除引用关系,以避免内存泄露的发生。
2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用
create()方法来创建一个 Observable ,并为它定义事件触发规则。可以看到,参数这里传入了一个
OnSubscribe对象作为参数。
OnSubscribe会被存储在返回的
Observable对象中,它的作用相当于一个计划表,当
Observable被订阅的时候,
OnSubscribe的
call()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者
Subscriber将会被调用三次
onNext()和一次
onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
create()方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:
just(T...): 将传入的参数依次发送出来。
from(T[])/
from(Iterable<? extends T>): 将传入的数组或
Iterable拆分成具体对象后,依次发送出来。
下面展示了创建Observable的各种方法。
just( )— 将一个或多个对象转换成发射这个或这些对象的一个Observable
from( )— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
repeat( )— 创建一个重复发射指定数据或数据序列的Observable
repeatWhen( )— 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
create( )— 使用一个函数从头创建一个Observable
defer( )— 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
range( )— 创建一个发射指定范围的整数序列的Observable
interval( )— 创建一个按照给定的时间间隔发射整数序列的Observable
timer( )— 创建一个在给定的延时之后发射单个数据的Observable
empty( )— 创建一个什么都不做直接通知完成的Observable
error( )— 创建一个什么都不做直接通知错误的Observable
never( )— 创建一个不发射任何数据的Observable
3) Subscribe (订阅)
创建了
Observable和
Observer之后,再用
subscribe()方法将它们联结起来,整条链子就可以工作了。
有人可能会注意到,
subscribe()这个方法有点怪:它看起来是『
observalbe订阅了
observer/
subscriber』而不是『
observer/
subscriber订阅了
observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成
observer.subscribe(observable)/
subscriber.subscribe(observable),虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。
Observable.subscribe(Subscriber)的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。 public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }
可以看到,[code]subscriber()做了3件事:[/code]
调用
Subscriber.onStart()。这个方法在前面已经介绍过,是一个可选的准备方法。
调用
Observable中的
OnSubscribe.call(Subscriber)。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,
Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当
subscribe()方法执行的时候。
将传入的
Subscriber作为
Subscription返回。这是为了方便
unsubscribe().
整个过程中对象间的关系如下图:
或者可以看动图:
完整3步的代码:
public static void learnRxJava(){ //生成Observer Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; //生成Subscriber //多了onStart和unsubscribe两个非常有用的方法 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onStart() { super.onStart(); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String o) { } }; if(subscriber.isUnsubscribed()){ subscriber.unsubscribe(); } //生成Observable的三种方法 //1.create Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onNext("2"); subscriber.onNext("3"); subscriber.onCompleted(); } }); //2.just // 将会依次调用: // onNext(1); // onNext(2); // onNext(3); // onNext(4); // onCompleted(); Observable<Integer> observable1 = Observable.just(1, 2, 3, 4); //3.from 参数传入一个集合 // 将会依次调用: // onNext(1); // onNext(2); // onNext(3); // onNext(4); // onCompleted(); Integer[] numbers = new Integer[]{1,2,3,4}; Observable<Integer> observable2 = Observable.from(numbers);
observable.subscribe(observer);
}
除了
subscribe(Observer)和
subscribe(Subscriber),
subscribe()还支持不完整定义的回调,RxJava 会自动根据定义创建出
Subscriber。形式如下:
Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } }; // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction); // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
简单解释一下这段代码中出现的
Action1和
Action0。
Action0是 RxJava 的一个接口,它只有一个方法
call(),这个方法是无参无返回值的;由于
onCompleted()方法也是无参无返回值的,因此
Action0可以被当成一个包装对象,将
onCompleted()的内容打包起来将自己作为一个参数传入
subscribe()以实现不完整定义的回调。这样其实也可以看做将
onCompleted()方法作为参数传进了
subscribe(),相当于其他某些语言中的『闭包』。
Action1也是一个接口,它同样只有一个方法
call(T param),这个方法也无返回值,但有一个参数;与
Action0同理,由于
onNext(T obj)和
onError(Throwable error)也是单参数无返回值的,因此
Action1可以将
onNext(obj)和
onError(error)打包起来传入
subscribe()以实现不完整定义的回调。事实上,虽然
Action0和
Action1在 API 中使用最广泛,但 RxJava 是提供了多个
ActionX形式的接口 (例如
Action2,
Action3) 的,它们可以被用以包装不同的无返回值的方法。
注:正如前面所提到的,
Observer和
Subscriber具有相同的角色,而且
Observer在
subscribe()过程中最终会被转换成
Subscriber对象,因此,从这里开始,后面的描述我将用
Subscriber来代替
Observer,这样更加严谨。
相关文章推荐
- Java的同名属性、同名普通函数、同名静态函数,是否被覆盖
- ubuntu16.04 jdk环境变量配置和安装android studio
- ubuntu16.04 jdk环境变量配置和安装android studio
- eclipse不自动弹出提示
- Android开发:Eclipse中Ctrl+O快捷键显示该java文件中所有变量及方法
- Java集合框架之Set--HashSet源码分析
- java值传递
- Java Lock-同步的另一种实现
- 如何使用java中的对象
- 深入Spring IOC源码之ResourceLoader
- javassist学习:动态创建二进制Java类二进制字节码并通过反射调用的示例
- java中的null
- Java并发编程系列之十四:阻塞队列
- [Java]main方法理解
- Javassist学习总结
- 如何定义java中的类
- 【转载】Java中Runnable和Thread的区别
- Java爬虫(一)利用GET和POST发送请求,获取服务器返回信息
- JAVA动态规划(三)--最长回文字符串(可删除中间字符)【腾讯2016实习生笔试题】
- 静态属性和方法理解