您的位置:首页 > 编程语言 > Java开发

RxJava学习(一)

2016-04-04 16:17 609 查看

注意:文字和图片转载自抛物线博客

参考:http://gank.io/post/560e15be2dca930e00da1083

RxJava 到底是什么

一个词:异步

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

然而,对于初学者来说,这太难看懂了。因为它是一个『总结』,而初学者更需要一个『引言』。

其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。

RxJava 好在哪

换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / ... ?』

一个词:简洁

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的
AsyncTask
Handler
,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

RxJava 的观察者模式
RxJava 有四个基本概念:
Observable
(可观察者,即被观察者)、
Observer
(观察者)、
subscribe
(订阅)、事件。
Observable
Observer
通过
subscribe()
方法实现订阅关系,从而
Observable
可以在需要的时候发出事件来通知
Observer


与传统观察者模式不同, RxJava 的事件回调方法除了普通事件
onNext()
(相当于
onClick()
/
onEvent()
)之外,还定义了两个特殊的事件:
onCompleted()
onError()


onCompleted()
: 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的
onNext()
发出时,需要触发
onCompleted()
方法作为标志。

onError()
: 事件队列异常。在事件处理过程中出异常时,
onError()
会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中,
onCompleted()
onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,
onCompleted()
onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:



2. 基本实现

基于以上的概念, RxJava 的基本实现主要有三点:

1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。

除了
Observer
接口之外,RxJava 还内置了一个实现了
Observer
的抽象类:
Subscriber
Subscriber
Observer
接口进行了一些扩展,但他们的基本使用方式是完全一样的:

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,
Observer
也总是会先被转换成一个
Subscriber
再使用。所以如果你只想使用基本功能,选择
Observer
Subscriber
是完全一样的。它们的区别对于使用者来说主要有两点:

onStart()
: 这是
Subscriber
增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),
onStart()
就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用
doOnSubscribe()
方法,具体可以在后面的文中看到。

unsubscribe()
: 这是
Subscriber
所实现的另一个接口
Subscription
的方法,用于取消订阅。在这个方法被调用后,
Subscriber
将不再接收事件。一般在这个方法调用前,可以使用
isUnsubscribed()
先判断一下状态。
unsubscribe()
这个方法很重要,因为在
subscribe()
之后,
Observable
会持有
Subscriber
的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如
onPause()
onStop()
等方法中)调用
unsubscribe()
来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用
create()
方法来创建一个 Observable ,并为它定义事件触发规则。可以看到,参数这里传入了一个
OnSubscribe
对象作为参数。
OnSubscribe
会被存储在返回的
Observable
对象中,它的作用相当于一个计划表,当
Observable
被订阅的时候,
OnSubscribe
call()
方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者
Subscriber
将会被调用三次
onNext()
和一次
onCompleted()
)。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

create()
方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:

just(T...)
: 将传入的参数依次发送出来。

from(T[])
/
from(Iterable<? extends T>)
: 将传入的数组或
Iterable
拆分成具体对象后,依次发送出来。

下面展示了创建Observable的各种方法。

just( )
— 将一个或多个对象转换成发射这个或这些对象的一个Observable

from( )
— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable

repeat( )
— 创建一个重复发射指定数据或数据序列的Observable

repeatWhen( )
— 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据

create( )
— 使用一个函数从头创建一个Observable

defer( )
— 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable

range( )
— 创建一个发射指定范围的整数序列的Observable

interval( )
— 创建一个按照给定的时间间隔发射整数序列的Observable

timer( )
— 创建一个在给定的延时之后发射单个数据的Observable

empty( )
— 创建一个什么都不做直接通知完成的Observable

error( )
— 创建一个什么都不做直接通知错误的Observable

never( )
— 创建一个不发射任何数据的Observable

3) Subscribe (订阅)
创建了
Observable
Observer
之后,再用
subscribe()
方法将它们联结起来,整条链子就可以工作了。


有人可能会注意到,
subscribe()
这个方法有点怪:它看起来是『
observalbe
订阅了
observer
/
subscriber
』而不是『
observer
/
subscriber
订阅了
observalbe
』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成
observer.subscribe(observable)
/
subscriber.subscribe(observable)
,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。


Observable.subscribe(Subscriber)
的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}


可以看到,[code]subscriber()
做了3件事:[/code]

调用
Subscriber.onStart()
。这个方法在前面已经介绍过,是一个可选的准备方法。

调用
Observable
中的
OnSubscribe.call(Subscriber)
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,
Observable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当
subscribe()
方法执行的时候。

将传入的
Subscriber
作为
Subscription
返回。这是为了方便
unsubscribe()
.

整个过程中对象间的关系如下图:



或者可以看动图:



完整3步的代码:

public static void learnRxJava(){
//生成Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};

//生成Subscriber
//多了onStart和unsubscribe两个非常有用的方法
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String o) {

}
};

if(subscriber.isUnsubscribed()){
subscriber.unsubscribe();
}

//生成Observable的三种方法

//1.create
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
});

//2.just
// 将会依次调用:
// onNext(1);
// onNext(2);
// onNext(3);
// onNext(4);
// onCompleted();
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);

//3.from 参数传入一个集合
// 将会依次调用:
// onNext(1);
// onNext(2);
// onNext(3);
// onNext(4);
// onCompleted();
Integer[] numbers = new Integer[]{1,2,3,4};
Observable<Integer> observable2 = Observable.from(numbers);

observable.subscribe(observer);

}


除了
subscribe(Observer)
subscribe(Subscriber)
subscribe()
还支持不完整定义的回调,RxJava 会自动根据定义创建出
Subscriber
。形式如下:

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);


简单解释一下这段代码中出现的
Action1
Action0
Action0
是 RxJava 的一个接口,它只有一个方法
call()
,这个方法是无参无返回值的;由于
onCompleted()
方法也是无参无返回值的,因此
Action0
可以被当成一个包装对象,将
onCompleted()
的内容打包起来将自己作为一个参数传入
subscribe()
以实现不完整定义的回调。这样其实也可以看做将
onCompleted()
方法作为参数传进了
subscribe()
,相当于其他某些语言中的『闭包』。
Action1
也是一个接口,它同样只有一个方法
call(T param)
,这个方法也无返回值,但有一个参数;与
Action0
同理,由于
onNext(T obj)
onError(Throwable error)
也是单参数无返回值的,因此
Action1
可以将
onNext(obj)
onError(error)
打包起来传入
subscribe()
以实现不完整定义的回调。事实上,虽然
Action0
Action1
在 API 中使用最广泛,但 RxJava 是提供了多个
ActionX
形式的接口 (例如
Action2
,
Action3
) 的,它们可以被用以包装不同的无返回值的方法。


注:正如前面所提到的,
Observer
Subscriber
具有相同的角色,而且
Observer
subscribe()
过程中最终会被转换成
Subscriber
对象,因此,从这里开始,后面的描述我将用
Subscriber
来代替
Observer
,这样更加严谨。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: