RxJava的Observable创建和发布流程原理简单分析
Rx框架从发布以来已经被越来越多的人所认可,尤其是其类型转换和线程自由切换可以大大减少代码的回调层数,使代码可读性大大提高
我们先看一个例子
[code] Observable.just("1") .map(new Func1<String, Integer>() { @Override public Integer call(String s) { System.out.println("first ->>> " + Thread.currentThread().getId()); return new Integer(s); } }) .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { System.out.println("second ->>> " + Thread.currentThread().getId()); return Observable.just(integer.intValue() * 2); } }) .observeOn(Schedulers.newThread()) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { System.out.println("third ->>> " + Thread.currentThread().getId()); return "No." + integer; } }) .observeOn(Schedulers.newThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("forth ->>> " + Thread.currentThread().getId()); System.out.println("result ->>> " + s); } });
我们把字符串转int后乘以2再加一个前缀返回,同时打印了执行操作的相应线程id,输出结果是
[code]first ->>> 1 second ->>> 1 third ->>> 15 forth ->>> 14 result ->>> No.2
可以发现前两个是同一个发布线程,而后面两个我都设置了新的回调处理线程,所以后面两个分别在不同的新线程,是不是很方便,具体线程是怎么切换的我会在下一篇里分析
我们先看一下Observable的创建方法
[code]public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }
Observable会创建一个OnSubscribe实例,而这个实例会被存放在当前Observable的成员变量中,注意存放的这个变量是final类型;一般我们并不会去自定义一个这个,用Rx提供的api就足够了
接下来我们用常用方法分析一下
[code] Observable.just(1,2,3).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.println("->>>> " + integer); } });
这就是一个简单的案列,会依次发布1-3的数据,我们看一下just都做了啥
[code] public static <T> Observable<T> just(T t1, T t2, T t3) { return from((T[])new Object[] { t1, t2, t3 }); } public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return create(new OnSubscribeFromArray<T>(array)); }
我们可以看出,just会把数据包装成一个数组再调用from方法,这两个常用方法其实可以看成一个
其中OnSubscribeFromArray实现了OnSubscribe接口
调用just会把要发布的数据组装成功一个数据,然后把这个数据交付给OnSubscribeFromArray,然后把这个实例化的对象作为OnSubscribe从而创建一个新的Observable对象,所以这里只会返回一个Observable对象,那什么时候才会发布数据呢?
然后是subscribe方法的调用
[code] public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { //忽略部分代码,正常流程的大致意思 subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } observable.onSubscribe.call(subscriber); return subscriber; }
这里会先执行subscriber的onStart方法,然后调用上面保存的OnSubscribe对象的call方法,把subscriber作为参数传进去,最后把subscribe作为Subscription返回,其中subscribe本身实现了Subscription接口
这里有个一SafeSubscriber,这个是SubScriber的子类,非这个类的都会被强制包装成这个类型,至于为什么
[code]private final SubscriptionList subscriptions; private final Subscriber<?> subscriber; protected Subscriber() { this(null, false); } protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } public class SafeSubscriber<T> extends Subscriber<T> { private final Subscriber<? super T> actual; public SafeSubscriber(Subscriber<? super T> actual) { super(actual); this.actual = actual; } }
Subscribe中有两个成员终型成员变量,我们默认传空参数是不会给这两个赋值的,而这两个变量对后续发布会有作用
Subscription只提供两个方法,主要是解除注册和相应的判断,后续发布判断都会判断这个依据,如果被解除注册则后续的onNext, onError和onComplete都不会再通知
[code]public interface Subscription { void unsubscribe(); boolean isUnsubscribed(); }
那重点就回到OnSubscribe的call方法上
[code]@Override public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); } public void setProducer(Producer p) { long toRequest; boolean passToSubscriber = false; synchronized (this) { toRequest = requested; producer = p; if (subscriber != null) { // middle operator ... we pass thru unless a request has been made if (toRequest == NOT_SET) { // we pass-thru to the next producer as nothing has been requested passToSubscriber = true; } } } // do after releasing lock if (passToSubscriber) { subscriber.setProducer(producer); } else { // we execute the request with whatever has been requested (or Long.MAX_VALUE) if (toRequest == NOT_SET) { producer.request(Long.MAX_VALUE); } else { producer.request(toRequest); } } }
call会调用上面创建的subscribe的setProducer方法,这个subscribe就是上面SafeSubscriber包装的内部对象
而SafeSubscriber这个subscribe不为空,其父类Subscriber的subscribe是空 的,requested则是默认初始值NOT_SET,所以最后这个会连调两次最终到父类的方法,走到producer.request(Long.MAX_VALUE)
就是上面的new FromArrayProducer(...).request(Long.MAX_VALUE)
再看下这个类
[code] static final class FromArrayProducer<T> extends AtomicLong implements Producer { final Subscriber<? super T> child; final T[] array; public FromArrayProducer(Subscriber<? super T> child, T[] array) { this.child = child; this.array = array; } ... }
参数一个是当前的subscribe,一个是初始化传入的泛型数组
[code]@Override public void request(long n) { if (n < 0) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } if (n == Long.MAX_VALUE) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { fastPath(); } } else if (n != 0) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { slowPath(n); } } }
最终会执行到这里的fastPath
[code]void fastPath() { final Subscriber<? super T> child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted(); }
这里会依次调用child也就是我们注册的Subscriber的onNext(),数组值取完后会调用onCompleted();而且onNext和onComplete前都会检测Subscriber的注册状态,如果已经被解除则后续的都不会给回调回来
注:这里的child是被封装后的SafeSubscriber了,具体方法是
[code]@Override public void onCompleted() { if (!done) { done = true; try { actual.onCompleted(); } catch (Throwable e) { ... } } } @Override public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { ... } }
也就是最终会回到我们最初注册的那个subscriber内
总结:
1. Observable初始化添加数组会创建一个OnSubscriber实例存储数组,不做具体操作
2. subscribe方法需要传入我们自定义的Subscriber实现类我们叫做mySubscriber,就是接收到数据的处理方法;调用后会马上调用mySubscriber的onStart()方法,然后把这个用SafeSubscriber包装后,调用最开始创建的OnSubscriber的call方法,传入这个包后的对象,并把这个对象当做Subscription操作类返回,这个操作类可以控制事件的解注册判断
3.OnSubscriber对象的call方法最终会遍历数组内的泛型数据,并通过把这些数据原封不动的依次分发到我们注册的方法内
- 点赞
- 收藏
- 分享
- 文章举报
- 简单Rxjava订阅的源码流程分析
- AutoCompleteTextView悬浮提示列表创建原理简单分析
- rxJava的使用--Observable的创建及源码分析(一)
- RxJava2.0-Observable原理分析之Create操作符
- rxJava的使用--Observable的创建及源码分析(三)
- rxJava的使用--Observable的创建及源码分析(二)
- laravel 框架执行流程与原理简单分析
- android中wifi原理及流程分析(很经典)
- Redis 发布/订阅机制原理分析
- 关于未将对象引用设置到对象实例简单原理分析,与解决方法
- tomcat原理分析与简单实现
- 网口扫盲二:Mac与Phy组成原理的简单分析
- iOS中无限循环滚动简单处理实现原理分析
- rsync算法原理和工作流程分析
- SQL Server 2005 创建简单的存储过程--总结分析
- 启用IIS服务,并利用visual studio创建简单网页同过IIS发布
- 员工评估系统的流程简单分析
- 超简单的springboot自动配置原理分析
- Android在新进程中启动 Service 的流程原理分析
- ajax小demo---CORS的原理分析及简单使用