RxJava分析之订阅过程
2016-05-23 20:14
316 查看
最近在用RxJava,虽然正常使用没有问题,但过程中产生了很多疑问,比如Observable和Subscriber是怎么联系到一起的?OnSubscribe又是什么时候起作用的,起什么作用?Subscriber和Subscription有什么关系?unsubscribe之后Observable还在运行吗?等等这些疑问,所以顺着这些问题找找答案,梳理一下源码,给自己以后做个记录就不会忘记啦~若有不当欢迎指正,一起进步~
Subscriber - 订阅者(接收可观测数据源并可以进行处理的观察者)
OnSubscribe - 订阅时(订阅者订阅可被观察者时发生的一系列动作)
Subscription - 订阅(名词性质,包含了订阅状态和解除订阅的动作)
Observe - 观测 (对订阅者的处理流程的规范包含三个动作(onError onNext onComplete))
SubscriptionList(维护一个订阅列表)
上边的代码意思就是创建了一个Observable ,这个数据源被mySubscriber这个订阅者订阅,当subscribe时mySubscriber被当作参数传递给onSubscribeTimerPeriodically的call方法,当然工作线程中产生数据,在主线程中接收数据
下边看看onSubscribeTimerPeriodically这个”订阅时“
通过上边的代码可以发现这就是订阅时的作用,同时大概能感觉到,Observable就是一个”架子“,确实是这样的,为了阅读的连续性这个后边展开说,接下来先看看之前我们有一个疑问:unsubscribe之后Observable还在运行(这么说其实是不对的,但是符合刚开始接触RxJava的直觉,先这么写,后边解释)吗?
这句告诉我们mySubscriber执行了add()方法添加到内部的订阅列表,实际上是调用了SubscriptionList的add()方法,说明Worker一定实现了Subscription接口,并且要找的答案就在
其中scheduler是抽象类Scheduler的一个子类的实例,通常有三种实现
参数
因为调用Subscriber.unsubscribe(),会迭代每一个SubscriptionList中的订阅调用其unsubscribe()方法,而上边也看到里面实际是继承自NewThreadWorker 的对象,其解除订阅方法如下:
executor是ThreadPoolExecutor类型,很明显疑问解决了并且也可以来说说为什么Obervable是一个“架子”了
Observable本身并不提供工作线程,也不管理调度
Observable依靠传入的OnSubscribe决定订阅时发生的动作
OnSubscribe把工作线程add给Subscriber维护的subscriptions
Observable提供了一系列的Operator对Observable进行各种变换过滤等操作
如果现有操作不能满足需要还可以通过lift自己实现特定操作
画了一张图,有一些不准确的地方,但是把订阅过程的逻辑都表达出来了:
最后说一点,以前老是觉得Observable.subscribe(Subscriber)这种方式怪怪的,被观察者订阅了订阅者,你说怪不怪?正好张孝祥老师的一个视频点醒了我,面向对象的本质就是谁拥有数据谁就有权进行操作,按照这个想法,订阅的方法当然得在被订阅者的身上,这么说来也可以理解了,而且这样更好封装,就像android中draw一个view,这个draw()方法不是外部一个什么对象去调用,而正是view自己调用类似view.draw()
根据我的理解说一下以上几个概念
Observable - 可被观察者(就是一个可观测的数据源)Subscriber - 订阅者(接收可观测数据源并可以进行处理的观察者)
OnSubscribe - 订阅时(订阅者订阅可被观察者时发生的一系列动作)
Subscription - 订阅(名词性质,包含了订阅状态和解除订阅的动作)
Observe - 观测 (对订阅者的处理流程的规范包含三个动作(onError onNext onComplete))
SubscriptionList(维护一个订阅列表)
有了上边的基本了解再来看看源码分析
通常在Android上一个简单的订阅是这样的/*其中onSubscribeTimerPeriodically是Observable.OnSubscribe类型,就是一个订阅时,这个订阅时做了那些事?简单一点就做一件事,创建一个Subscription类型的工作线程,拿到订阅者并把工作线程添加进订阅者内部维护的SubscriptionList中去,然后异步向订阅者发送数据(后边详细分析)*/ Observable.create(onSubscribeTimerPeriodically) .subscribe(mySubscriber) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()); mySubscriber = new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long val) { } };
上边的代码意思就是创建了一个Observable ,这个数据源被mySubscriber这个订阅者订阅,当subscribe时mySubscriber被当作参数传递给onSubscribeTimerPeriodically的call方法,当然工作线程中产生数据,在主线程中接收数据
下边看看onSubscribeTimerPeriodically这个”订阅时“
/*这是一个带有初始延时的周期的长整数发生器,关于scheduler和底层的线程调度机制后边会再专门写一篇分析文章,现在主要分析订阅的过程*/ public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> { final long initialDelay; final long period; final TimeUnit unit; final Scheduler scheduler; public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { this.initialDelay = initialDelay; this.period = period; this.unit = unit; this.scheduler = scheduler; } /*child参数就是上边的mySubscriber*/ @Override public void call(final Subscriber<? super Long> child) { /*还记得上边说过Subscriber内部维护了一个SubscriptionList,这个worker被add到其中,这样在外部Subscriber就可以控制订阅的状态,比如调用unsubscribe方法解除订阅*/ final Worker worker = scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; @Override public void call() { try { child.onNext(counter++); } catch (Throwable e) { try { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, child); } } } }, initialDelay, period, unit); } }
通过上边的代码可以发现这就是订阅时的作用,同时大概能感觉到,Observable就是一个”架子“,确实是这样的,为了阅读的连续性这个后边展开说,接下来先看看之前我们有一个疑问:unsubscribe之后Observable还在运行(这么说其实是不对的,但是符合刚开始接触RxJava的直觉,先这么写,后边解释)吗?
child.add(worker);
这句告诉我们mySubscriber执行了add()方法添加到内部的订阅列表,实际上是调用了SubscriptionList的add()方法,说明Worker一定实现了Subscription接口,并且要找的答案就在
scheduler.createWorker();
其中scheduler是抽象类Scheduler的一个子类的实例,通常有三种实现
EventLoopsScheduler CachedThreadScheduler NewThreadScheduler,前两个是线程池实现,后一个是直接创建新的线程,就以EventLoopsScheduler分析其createWorker():
@Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); }
参数
pool.get().getEventLoop()是
static final class PoolWorker extends NewThreadWorker { PoolWorker(ThreadFactory threadFactory) { super(threadFactory); } }
因为调用Subscriber.unsubscribe(),会迭代每一个SubscriptionList中的订阅调用其unsubscribe()方法,而上边也看到里面实际是继承自NewThreadWorker 的对象,其解除订阅方法如下:
@Override public void unsubscribe() { isUnsubscribed = true; executor.shutdownNow();//马上停掉线程池里所有的线程 deregisterExecutor(executor); }
executor是ThreadPoolExecutor类型,很明显疑问解决了并且也可以来说说为什么Obervable是一个“架子”了
Observable本身并不提供工作线程,也不管理调度
Observable依靠传入的OnSubscribe决定订阅时发生的动作
OnSubscribe把工作线程add给Subscriber维护的subscriptions
Observable提供了一系列的Operator对Observable进行各种变换过滤等操作
如果现有操作不能满足需要还可以通过lift自己实现特定操作
画了一张图,有一些不准确的地方,但是把订阅过程的逻辑都表达出来了:
最后说一点,以前老是觉得Observable.subscribe(Subscriber)这种方式怪怪的,被观察者订阅了订阅者,你说怪不怪?正好张孝祥老师的一个视频点醒了我,面向对象的本质就是谁拥有数据谁就有权进行操作,按照这个想法,订阅的方法当然得在被订阅者的身上,这么说来也可以理解了,而且这样更好封装,就像android中draw一个view,这个draw()方法不是外部一个什么对象去调用,而正是view自己调用类似view.draw()
相关文章推荐
- 使用C++实现JNI接口需要注意的事项
- Android IPC进程间通讯机制
- Android Manifest 用法
- [转载]Activity中ConfigChanges属性的用法
- Android之获取手机上的图片和视频缩略图thumbnails
- Android之使用Http协议实现文件上传功能
- Android学习笔记(二九):嵌入浏览器
- android string.xml文件中的整型和string型代替
- i-jetty环境搭配与编译
- android之定时器AlarmManager
- android wifi 无线调试
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- android 代码实现控件之间的间距
- android FragmentPagerAdapter的“标准”配置
- Android"解决"onTouch和onClick的冲突问题
- android:installLocation简析
- android searchView的关闭事件
- SourceProvider.getJniDirectories