您的位置:首页 > 编程语言 > Java开发

RxJava源碼分析二:观察者模式的实现流程以及线程控制

2018-03-23 19:00 417 查看
在上一个篇的Rxjava源码分析中我们分析过Rxjava是扩展的观察者模式
好比是Button的点击


上面的按钮点击响应事件可以类比成如下的模式:



RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subs
15a89
criber<String>() {
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
    @Override
    public void onNext(String s) {
    }};
 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onCompleted();
    }}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just()  from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行我们从源码的角度大致分析一下事件的传播流程:   我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系   从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法  还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据  从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:

        然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的        也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread())指定事件接收线程
    .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
            }}); 如上代码所示:    subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
在上一个篇的Rxjava源码分析中我们分析过Rxjava是扩展的观察者模式
好比是Button的点击


上面的按钮点击响应事件可以类比成如下的模式:



RxJava 的观察者模式·RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而使得 Observable 可以在需要的时候发出事件来通知 Observer与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()(1)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。(2)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。(3)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 既然是观察者模式那么必然会有一个事件的传递过程,那么接下来让我们来看一下RxJava是如何实现事件传递的:
三个步骤:(Rx1.x版本)(类比点击事件)1) 创建 Observer(创建一个OnClickListener)Observer 即观察者,它决定事件触发的时候将有怎样的行为Subscriber<String> observer = new Subscriber<String>() {
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
    @Override
    public void onNext(String s) {
    }};
 2) 创建 Observable(定义Button,定义button内部在什么条件下触发onClick方法)Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件,RxJava 使用 create() 方法来创建一个 Observable3) Subscribe (订阅)创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来(相当于setOnClickListener方法将listener和button关联起来)observable.subscribe(subscriber); 我们看看RxJava的第二步创建Observable 是如何实现的:Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onCompleted();
    }}); 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在创建的 Observable 对象中create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如just()  from()第三步订阅方法的关键代码:public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;}注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码可以看到关键之处在于调用 了Observable 中的 OnSubscribe.Call (Subscriber) 。在这里,事件发送的逻辑开始运行我们从源码的角度大致分析一下事件的传播流程:   我们首先创建了一个Subscriber对象,这个对象就是观察者模式中的观察者,然后我们创建了一个Observable对象,这个对象就是观察者模式中的被观察者,最后我们调用subscribe方法,让观察者和被观察者之间产生联系   从我们使用者来说流程就是这么简单,但是通过上面的源码我们可以看到,在第二步创建Observable的时候,我们传递进去了一个OnSubscribe对象,此对象实现了action1接口的call方法,当我们调用subscribe方法的时候,我们把观察者传递了进去,然后让它开始工作,再然后我们调用了OnSubscribe对象的call方法,并将观察者对象作为参数传递进去,于是这里就调用到了我们创建Observable的时候实现的OnSubscribe的call方法  还记得我们在这个call方法里面,使用参数subscriber发送了两个数据,然后调用onComplete方法结束发送,有没有发现这个subscriber就是我们调用subscribe方法的时候传递进去的subscriber,换一句话说,RxJava是让观察者自己给自己发送数据  从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候,如下图:

        然而在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的        也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。我们知道观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,就需要用到 RxJava 的另一个概念: Scheduler线程控制 —— Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(2.x已弃用)
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
subscribeOn()(顾名思义:订阅)指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做订阅事件发生的线程observeOn()(观察)指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,代码如下:Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread())指定事件接收线程
    .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
            }}); 如上代码所示:    subscribeOn()无论定义在哪里其操作的都是subscribe方法执行时的线程,而在subscribe方法执行的时候会去调用just方法被创建的事件的内容 1、2、3、4 将会在 IO 线程发出,然后线程切换到了主线程,指定 Subscriber 的回调发生在主线程,因此call方法的执行就是在android主线程中执行的
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: