RxJava observeOn()与subscribeOn()的关系
2016-06-15 22:17
363 查看
observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的schedule,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的schedule。
或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。
两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。
observeOn()主要改变的是发送的线程,即onNext()执行的线程。
运行如下:
我们看一下subscribeOn()中,都干了什么
很明显,会走if之外的方法。
在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法
OperatorSubscribeOn代码:
可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。
这里做个总结。
把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。
那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。
调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。
下面分析下call()方法。
inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
Subscriber被包装了一层。
source.unsafeSubscribe(s);,注意source是Observable_1对象。
unsafeSubscribe方法代码:
代码很多,关键代码:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。
对于以上线程,我们可以总结,其有如下流程:
Observable.create() : 创建了Observable_1和OnSubscribe_1;
subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
observable_2.subscribe(Observer):
调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。
从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。
那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序
那么,我们根据之前的源码分析其执行逻辑。
Observable.just(“ss”),创建Observable_1,OnSubscribe_1
Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
Observable_3.subscribe():
调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。
根据以上逻辑分析,会按照1的线程进行执行。
subscribeOn如何工作,关键代码其实就是一行代码:
注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:
subscribeOn的调用,改变了调用前序列所运行的线程。
这里引出了新的操作符lift
这里不再介绍了,详见:/article/11853094.html
在lift()中,有如下关键代码:
OperatorObserveOn.call()核心代码:
我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。
我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程
这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker
我们这里需要注意下:
在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。
onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。
recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。
scheduler中调用的call()方法
call()中有localChild.onNext(localOn.getValue(v));调用。
在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。
总结起来的结论就是:
observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber
observeOn改变的是onNext()调用。
subcribeOn和observeOn 对比分析
有如上逻辑,则我们对其运行进行分析。
首先,我们需要先明白其内部执行的逻辑。
在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。
其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。
那么对于以上的逻辑,我们可以得出如下结论:
操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
操作5,6在main线程中运行。因为observeOn()改变了onNext().
特别注意那一个逻辑没起到作用
再简单点总结就是
subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用subscribeOn 切换线程
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?
因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。
@扔物线 大神给的总结:
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
只有第一subscribeOn() 起作用(所以多个 subscribeOn() 吴意义);
这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。
参考文章:
https://segmentfault.com/a/1190000004856071
/article/9433908.html
虽然文章大部分是参考的,但是我写这个博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。
或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。
两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。
subscribeOn()和observeOn()的区别
subscribeOn()主要改变的是订阅的线程,即call()执行的线程;observeOn()主要改变的是发送的线程,即onNext()执行的线程。
subscribeOn
我们先看一个例子。Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("b"); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String integer) { System.out.println(integer); } });
运行如下:
a b
我们看一下subscribeOn()中,都干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
很明显,会走if之外的方法。
在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法
OperatorSubscribeOn代码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); } }
可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。
这里做个总结。
把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。
那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。
调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。
下面分析下call()方法。
inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
Subscriber被包装了一层。
source.unsafeSubscribe(s);,注意source是Observable_1对象。
unsafeSubscribe方法代码:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }
代码很多,关键代码:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。
对于以上线程,我们可以总结,其有如下流程:
Observable.create() : 创建了Observable_1和OnSubscribe_1;
subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
observable_2.subscribe(Observer):
调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。
从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。
那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序
Observable.just("ss") .subscribeOn(Schedulers.io()) // ----1--- .subscribeOn(Schedulers.newThread()) //----2---- .subscribe(new Action1<String>() { @Override public void call(String s) { } });
那么,我们根据之前的源码分析其执行逻辑。
Observable.just(“ss”),创建Observable_1,OnSubscribe_1
Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
Observable_3.subscribe():
调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。
根据以上逻辑分析,会按照1的线程进行执行。
subscribeOn如何工作,关键代码其实就是一行代码:
source.unsafeSubscribe(s);
注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:
subscribeOn的调用,改变了调用前序列所运行的线程。
observeOn
看一下observeOn()源码:public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, RxRingBuffer.SIZE); } public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) { return observeOn(scheduler, false, bufferSize); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
这里引出了新的操作符lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
这里不再介绍了,详见:/article/11853094.html
在lift()中,有如下关键代码:
Subscriber<? super T> st = hook.onLift(operator).call(o);
OperatorObserveOn.call()核心代码:
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } }
我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。
我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程
@Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); }
这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker
我们这里需要注意下:
在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。
onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。
protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } }
recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。
scheduler中调用的call()方法
// only execute this from schedule() @Override public void call() { long missed = 1L; long currentEmission = emitted; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each bufferSize elements) for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } }
call()中有localChild.onNext(localOn.getValue(v));调用。
在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。
总结起来的结论就是:
observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber
observeOn改变的是onNext()调用。
subcribeOn和observeOn 对比分析
Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io) .map //操作3 .flatMap //操作4 .observeOn(main) .map //操作5 .flatMap //操作6 .subscribeOn(io) //!!特别注意 .subscribe(handleData)
有如上逻辑,则我们对其运行进行分析。
首先,我们需要先明白其内部执行的逻辑。
在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。
其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。
那么对于以上的逻辑,我们可以得出如下结论:
操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
操作5,6在main线程中运行。因为observeOn()改变了onNext().
特别注意那一个逻辑没起到作用
再简单点总结就是
subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用subscribeOn 切换线程
复杂情况
我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?
因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。
@扔物线 大神给的总结:
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
只有第一subscribeOn() 起作用(所以多个 subscribeOn() 吴意义);
这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。
参考文章:
https://segmentfault.com/a/1190000004856071
/article/9433908.html
虽然文章大部分是参考的,但是我写这个博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。
相关文章推荐
- JAVA中的static -静态变量-静态方法-静态初始化块
- 自学Java第二天 解决java不能输出中文问题
- Spring 基础知识学习
- Eclipse设置xml用spring编辑器打开的方法
- Myeclipse 改变代码字体的大小
- 关于MyEclipse中导入工程过程中出现乱码纠结一个中之后的整理,真实有用!
- Java中的内部类、匿名类的使用
- Java虚拟机
- Struts1 MVC框架的工作原理
- Jimoshi_Spring 框架学习(二)--AOP(面向切面)、AOP管理事务
- java pdf转换jpg
- Java语言编写矩阵转置
- Java反射学习总结(3)——反射的基本操作
- JAVA设计模式:代理模式
- 初学者对Spring MVC的认识
- JavaSE随笔——原码反码补码
- JavaWeb学习之HibernateDaoSupport使用
- javac -d . hello.java中的 -d .是什么作用?
- Java反射学习总结(2)——动态加载类
- 46. Spring Boot中使用AOP统一处理Web请求日志