您的位置:首页 > 编程语言 > Java开发

RXJava——Subscribe (订阅)

2016-06-27 18:27 399 查看
原文链接:http://gank.io/post/560e15be2dca930e00da1083

创建了
Observable
Observer
之后,再用
subscribe()
方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);


有人可能会注意到,
subscribe()
这个方法有点怪:它看起来是『
observalbe
订阅了
observer
/
subscriber
』而不是『
observer
/
subscriber
订阅了
observalbe
』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成
observer.subscribe(observable)
/
subscriber.subscribe(observable)
,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

Observable.subscribe(Subscriber)
的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}

可以看到,
subscriber()
做了3件事:

调用
Subscriber.onStart()
。这个方法在前面已经介绍过,是一个可选的准备方法。
调用
Observable
中的
OnSubscribe.call(Subscriber)
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,
Observable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当
subscribe()
方法执行的时候。
将传入的
Subscriber
作为
Subscription
返回。这是为了方便
unsubscribe()
.
整个过程中对象间的关系如下图:



或者可以看动图:



除了
subscribe(Observer)
subscribe(Subscriber)
subscribe()
还支持不完整定义的回调,RxJava 会自动根据定义创建出
Subscriber
。形式如下:

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的
Action1
Action0
Action0
是 RxJava 的一个接口,它只有一个方法
call()
,这个方法是无参无返回值的;由于
onCompleted()
方法也是无参无返回值的,因此
Action0
可以被当成一个包装对象,将
onCompleted()
的内容打包起来将自己作为一个参数传入
subscribe()
以实现不完整定义的回调。这样其实也可以看做将
onCompleted()
方法作为参数传进了
subscribe()
,相当于其他某些语言中的『闭包』。
Action1
也是一个接口,它同样只有一个方法
call(T param)
,这个方法也无返回值,但有一个参数;与
Action0
同理,由于
onNext(T obj)
onError(Throwable error)
也是单参数无返回值的,因此
Action1
可以将
onNext(obj)
onError(error)
打包起来传入
subscribe()
以实现不完整定义的回调。事实上,虽然
Action0
Action1
在 API 中使用最广泛,但 RxJava 是提供了多个
ActionX
形式的接口 (例如
Action2
,
Action3
) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,
Observer
Subscriber
具有相同的角色,而且
Observer

subscribe()
过程中最终会被转换成
Subscriber
对象,因此,从这里开始,后面的描述我将用
Subscriber
来代替
Observer
,这样更加严谨。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: