您的位置:首页 > 编程语言 > Java开发

RxJava分析之订阅过程

2016-05-23 20:14 316 查看
最近在用RxJava,虽然正常使用没有问题,但过程中产生了很多疑问,比如Observable和Subscriber是怎么联系到一起的?OnSubscribe又是什么时候起作用的,起什么作用?Subscriber和Subscription有什么关系?unsubscribe之后Observable还在运行吗?等等这些疑问,所以顺着这些问题找找答案,梳理一下源码,给自己以后做个记录就不会忘记啦~若有不当欢迎指正,一起进步~

根据我的理解说一下以上几个概念

Observable - 可被观察者(就是一个可观测的数据源)

Subscriber - 订阅者(接收可观测数据源并可以进行处理的观察者)

OnSubscribe - 订阅时(订阅者订阅可被观察者时发生的一系列动作)

Subscription - 订阅(名词性质,包含了订阅状态和解除订阅的动作)

Observe - 观测 (对订阅者的处理流程的规范包含三个动作(onError onNext onComplete))

SubscriptionList(维护一个订阅列表)

有了上边的基本了解再来看看源码分析

通常在Android上一个简单的订阅是这样的

/*其中onSubscribeTimerPeriodically是Observable.OnSubscribe类型,就是一个订阅时,这个订阅时做了那些事?简单一点就做一件事,创建一个Subscription类型的工作线程,拿到订阅者并把工作线程添加进订阅者内部维护的SubscriptionList中去,然后异步向订阅者发送数据(后边详细分析)*/
Observable.create(onSubscribeTimerPeriodically)
.subscribe(mySubscriber)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());

mySubscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long val) {

}
};


上边的代码意思就是创建了一个Observable ,这个数据源被mySubscriber这个订阅者订阅,当subscribe时mySubscriber被当作参数传递给onSubscribeTimerPeriodically的call方法,当然工作线程中产生数据,在主线程中接收数据

下边看看onSubscribeTimerPeriodically这个”订阅时“

/*这是一个带有初始延时的周期的长整数发生器,关于scheduler和底层的线程调度机制后边会再专门写一篇分析文章,现在主要分析订阅的过程*/

public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
final long initialDelay;
final long period;
final TimeUnit unit;
final Scheduler scheduler;

public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}

/*child参数就是上边的mySubscriber*/

@Override
public void call(final Subscriber<? super Long> child) {

/*还记得上边说过Subscriber内部维护了一个SubscriptionList,这个worker被add到其中,这样在外部Subscriber就可以控制订阅的状态,比如调用unsubscribe方法解除订阅*/

final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}
}, initialDelay, period, unit);
}
}


通过上边的代码可以发现这就是订阅时的作用,同时大概能感觉到,Observable就是一个”架子“,确实是这样的,为了阅读的连续性这个后边展开说,接下来先看看之前我们有一个疑问:unsubscribe之后Observable还在运行(这么说其实是不对的,但是符合刚开始接触RxJava的直觉,先这么写,后边解释)吗?

child.add(worker);


这句告诉我们mySubscriber执行了add()方法添加到内部的订阅列表,实际上是调用了SubscriptionList的add()方法,说明Worker一定实现了Subscription接口,并且要找的答案就在
scheduler.createWorker();


其中scheduler是抽象类Scheduler的一个子类的实例,通常有三种实现
EventLoopsScheduler CachedThreadScheduler NewThreadScheduler
,前两个是线程池实现,后一个是直接创建新的线程,就以EventLoopsScheduler分析其createWorker():

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}


参数
pool.get().getEventLoop()


static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}


因为调用Subscriber.unsubscribe(),会迭代每一个SubscriptionList中的订阅调用其unsubscribe()方法,而上边也看到里面实际是继承自NewThreadWorker 的对象,其解除订阅方法如下:

@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();//马上停掉线程池里所有的线程
deregisterExecutor(executor);
}


executor是ThreadPoolExecutor类型,很明显疑问解决了并且也可以来说说为什么Obervable是一个“架子”了

Observable本身并不提供工作线程,也不管理调度

Observable依靠传入的OnSubscribe决定订阅时发生的动作

OnSubscribe把工作线程add给Subscriber维护的subscriptions

Observable提供了一系列的Operator对Observable进行各种变换过滤等操作

如果现有操作不能满足需要还可以通过lift自己实现特定操作

画了一张图,有一些不准确的地方,但是把订阅过程的逻辑都表达出来了:



最后说一点,以前老是觉得Observable.subscribe(Subscriber)这种方式怪怪的,被观察者订阅了订阅者,你说怪不怪?正好张孝祥老师的一个视频点醒了我,面向对象的本质就是谁拥有数据谁就有权进行操作,按照这个想法,订阅的方法当然得在被订阅者的身上,这么说来也可以理解了,而且这样更好封装,就像android中draw一个view,这个draw()方法不是外部一个什么对象去调用,而正是view自己调用类似view.draw()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  android rxjava observable