RxJava源碼分析二:观察者模式的实现流程以及线程控制
2018-03-23 19:00
417 查看
在上一个篇的Rxjava源码分析中我们分析过Rxjava是扩展的观察者模式
好比是Button的点击
上面的按钮点击响应事件可以类比成如下的模式:
RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subs
15a89
criber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
}}; 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onCompleted();
}}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just() from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行。我们从源码的角度大致分析一下事件的传播流程: 我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系 从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法 还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据 从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:
然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的 也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())指定事件接收线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
}}); 如上代码所示: subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
在上一个篇的Rxjava源码分析中我们分析过Rxjava是扩展的观察者模式
好比是Button的点击
上面的按钮点击响应事件可以类比成如下的模式:
RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
}}; 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onCompleted();
}}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just() from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行。我们从源码的角度大致分析一下事件的传播流程: 我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系 从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法 还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据 从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:
然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的 也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())指定事件接收线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
}}); 如上代码所示: subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
好比是Button的点击
上面的按钮点击响应事件可以类比成如下的模式:
RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subs
15a89
criber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
}}; 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onCompleted();
}}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just() from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行。我们从源码的角度大致分析一下事件的传播流程: 我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系 从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法 还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据 从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:
然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的 也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())指定事件接收线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
}}); 如上代码所示: subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
在上一个篇的Rxjava源码分析中我们分析过Rxjava是扩展的观察者模式
好比是Button的点击
上面的按钮点击响应事件可以类比成如下的模式:
RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
}}; 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onCompleted();
}}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just() from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行。我们从源码的角度大致分析一下事件的传播流程: 我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系 从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法 还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据 从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:
然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的 也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())指定事件接收线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
}}); 如上代码所示: subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
相关文章推荐
- 观察者模式II--Java原生实现分析以及一种全新的观察者模式
- hbase region lookups流程以及rpc线程卡死问题分析
- 通过rk 代码,分析android 及kernel 中audio 的控制以及binder的流程
- 深入列表遍历问题,并分析spring和tomcat中观察者模式的实现
- 三种工厂模式的分析以及C++实现
- PHP基础学习之流程控制的实现分析
- 观察者模式理解,以及JDK实现
- 三种工厂模式的分析以及C++实现
- 利用接口实现自定义监听事件以及观察者模式
- 深入列表遍历问题,并分析spring和tomcat中观察者模式的实现
- Java8 观察者模式详解 jdk实现源码分析
- offboard模式的控制流程分析
- 三种工厂模式的分析以及C++实现
- 观察者模式学习以及jdk实现
- 设计模式之二:观察者模式(简单实现(气象站模拟流程))
- ECC算法分析--openssl的实现以及其调用流程
- 设计模式-观察者模式,以及如何使用观察者来为app实现即时通讯功能
- 三种工厂模式的分析以及C++实现
- 从源码的角度带你分析Glide整体加载流程以及设计模式
- 免费SVN版本控制库Google code的搭建流程,以及隐藏代码,实现伪安全