您的位置:首页 > 编程语言 > Java开发

RxJava observeOn()与subscribeOn()的关系

2016-06-15 22:17 363 查看
observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的schedule,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的schedule。

或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。

两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。

subscribeOn()和observeOn()的区别

subscribeOn()主要改变的是订阅的线程,即call()执行的线程;

observeOn()主要改变的是发送的线程,即onNext()执行的线程。

subscribeOn

我们先看一个例子。

Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");

subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())

.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String integer) {
System.out.println(integer);
}
});


运行如下:

a
b


我们看一下subscribeOn()中,都干了什么

public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}


很明显,会走if之外的方法。

在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法

OperatorSubscribeOn代码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
}
}


可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。

这里做个总结。

把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。

那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。

调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

下面分析下call()方法。

inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。

Subscriber被包装了一层。

source.unsafeSubscribe(s);,注意source是Observable_1对象。

unsafeSubscribe方法代码:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}


代码很多,关键代码:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。

对于以上线程,我们可以总结,其有如下流程:

Observable.create() : 创建了Observable_1和OnSubscribe_1;

subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。

observable_2.subscribe(Observer):

调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);

Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。

从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。

那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序

Observable.just("ss")
.subscribeOn(Schedulers.io())   // ----1---
.subscribeOn(Schedulers.newThread()) //----2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {

}
});


那么,我们根据之前的源码分析其执行逻辑。

Observable.just(“ss”),创建Observable_1,OnSubscribe_1

Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。

observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。

Observable_3.subscribe():

调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。

调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。

调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。

根据以上逻辑分析,会按照1的线程进行执行。

subscribeOn如何工作,关键代码其实就是一行代码:

source.unsafeSubscribe(s);


注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。

observeOn

看一下observeOn()源码:

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}


这里引出了新的操作符lift

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}


这里不再介绍了,详见:/article/11853094.html

在lift()中,有如下关键代码:

Subscriber<? super T> st = hook.onLift(operator).call(o);


OperatorObserveOn.call()核心代码:

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}


我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。

我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程

@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}


这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}


recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。

scheduler中调用的call()方法

// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;

// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)

for (;;) {
long requestAmount = requested.get();

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(localOn.getValue(v));

currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}


call()中有localChild.onNext(localOn.getValue(v));调用。

在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

总结起来的结论就是:

observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上

observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

observeOn改变的是onNext()调用

subcribeOn和observeOn 对比分析

Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特别注意
.subscribe(handleData)


有如上逻辑,则我们对其运行进行分析。

首先,我们需要先明白其内部执行的逻辑。

在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。

其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。

那么对于以上的逻辑,我们可以得出如下结论:

操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。

操作5,6在main线程中运行。因为observeOn()改变了onNext().

特别注意那一个逻辑没起到作用

再简单点总结就是

subscribeOn的调用切换之前的线程。

observeOn的调用切换之后的线程。

observeOn之后,不可再调用subscribeOn 切换线程

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

@扔物线 大神给的总结:

下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;

只有第一subscribeOn() 起作用(所以多个 subscribeOn() 吴意义);

这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();

observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;

不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。

参考文章:

https://segmentfault.com/a/1190000004856071

/article/9433908.html

虽然文章大部分是参考的,但是我写这个博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: