Rxjava的基础用法和源码解析(二)
这篇博客的用法主要有 : lift , map , merge , concat
lift
[code]public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator)
这个方法算是rxjava中比较核心的方法,后面的map,flatmap等都会用到这个模块,看一下基本源码
[code] public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { //实际输出大概是这样 Subscriber<? super T> st = operator.call(o); try { st.onStart(); onSubscribe.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } });
这个其实就是一个类型转化的入口
我们最初传入的泛型是T,然后我们想要转化成泛型R进行输出,所以我们最终subscriber的泛型输出也是R,就是传入这里的o ,但为什么operator.call(o) 要返回一个T的subscriber呢? 这是因为我们要和最初传入的泛型保持一致,不然无法接受到参数,operator.call方法会创建一个新的subscriber,然后内部转化后回调o的具体方法,看似不变其实内部已经消化掉了;我们看一个例子
[code] Observable.just(1).lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { return new Subscriber<Integer>() { @Override public void onStart() { Log.e(TAG,"onStart 1"); } @Override public void onCompleted() { Log.e(TAG,"onCompleted 1"); subscriber.onCompleted(); } @Override public void onError(Throwable e) { Log.e(TAG,"onError 1"); subscriber.onError(e); } @Override public void onNext(Integer integer) { Log.e(TAG,"onNext 1"); subscriber.onNext("No. "+integer); } }; } }).subscribe(new Subscriber<String>() { @Override public void onStart() { Log.e(TAG,"onStart 2"); } @Override public void onCompleted() { Log.e(TAG,"onCompleted 2"); } @Override public void onError(Throwable e) { Log.e(TAG,"onError 2"); } @Override public void onNext(String s) { Log.e(TAG,"onNext 2 ->>> result = "+s); } });
这里我们把1转化成String后最终输出,结果是
[code]01-29 13:47:11.551 E/Main: onStart 2 01-29 13:47:11.551 E/Main: onStart 1 01-29 13:47:11.551 E/Main: onNext 1 01-29 13:47:11.551 E/Main: onNext 2 ->>> result = No. 1 01-29 13:47:11.551 E/Main: onCompleted 1 01-29 13:47:11.551 E/Main: onCompleted 2
至于为什么后续的start比我们的转化start要靠前,因为subscribe的时候优先会执行注册的subscriber对象,转化的会在lift里面的OnSubscribe.call()方法里执行,所以会滞后;
[code] private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); ... }
map
[code]public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
map最终会执行lift的转化模块,OperatorMap是新建的一个转化器,用法和lift差不多,对于单向转化更加方便
[code]public final class OperatorMap<T, R> implements Operator<R, T> { final Func1<? super T, ? extends R> transformer; public OperatorMap(Func1<? super T, ? extends R> transformer) { this.transformer = transformer; } @Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new Subscriber<T>(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwOrReport(e, this, t); } } }; }
用法很简单,具体结果就写了
[code] Observable.just(1).map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "No. " + integer; } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.e(TAG, "result = " + s); } });
merge
[code]public static <T> Observable<T> merge(Observable<? extends T>... t) public static <T> Observable<T> merge(Observable<? extends T>[] sequences) public static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences) public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent) public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)
我们分别看一下具体方法
[code]public static <T> Observable<T> merge(Observable<? extends T>... t)
最终会执行到
[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences) { return merge(from(sequences)); } public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { if (source.getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity()); } return source.lift(OperatorMerge.<T>instance(false)); }
from就是我们常用的遍历方法,最后就是还是会调用lift的转化方法,传入要被转化的操作详细,我们挑出里面的call方法
[code]public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) { this.child = child; ... } @Override public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) { MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); MergeProducer<T> producer = new MergeProducer<T>(subscriber); subscriber.producer = producer; child.add(subscriber); child.setProducer(producer); return subscriber; }
其中MergeSubscriber继承自 Subscriber<Observable<? extends T>>,并把我们最终需要发布的Subscriber<T>对象传入,也就是变成里面的成员变量child,那么我们只需要关心MergeSubscriber内部转化的onNext,onError和onComplete方法;我们挑onNext方法
[code]public void onNext(Observable<? extends T> t) { if (t == null) { return; } if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } }
这里又做了一次封装,也是这个InnerSubscriber也是一个Subscriber,然后调用我们上面传入的MergeSubscriber的unsafeSubscribe注册这个inner对象;
因为分发到这里的是Observable,我们接收具体值需要对Oservable注册subscriber,所以InnerSubscriber应该是分发具体的值;
虽然这个onNext方法里面是顺序的,但对于每个注册的InnerSubscriber都是独立的,也就是说每个分发事件接收都是独立的,如果这里的Observable对象都指定不同线程的话,那接受事件就会有很大不确定性,也就是并不一定会按传入的顺序依次发出;
我们再看下这个InnerSubscribe的onNext方法
[code]public void onNext(T t) { parent.tryEmit(this, t); } void tryEmit(InnerSubscriber<T> subscriber, T value) { ... emitScalar(subscriber, value, r); ... } protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) { ... child.onNext(value); ... }
最后会调用我们定义的subscriber的onNext方法,传入拆解后具体的属性值
[code]Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer o) { try { Thread.sleep(1000); } catch (Exception e) { } return o; } }); Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer o) { try { Thread.sleep(800); } catch (Exception e) { } return o; } }); Observable.merge(observable1, observable2).subscribe(new Action1() { @Override public void call(Object o) { Log.e(TAG, "result = " + o + " " + Thread.currentThread()); } });
我这里指定两个不同线程,并进行sleep不同时长,输出结果是
[code]01-29 19:07:35.823 E/Main: result = 3 Thread[RxCachedThreadScheduler-1,5,main] 01-29 19:07:36.022 E/Main: result = 1 Thread[RxComputationThreadPool-1,5,main] 01-29 19:07:36.626 E/Main: result = 4 Thread[RxCachedThreadScheduler-1,5,main] 01-29 19:07:37.022 E/Main: result = 2 Thread[RxComputationThreadPool-1,5,main]
也就是说指定不同线程输出有可能会错误,那我不指定线程呢,就是把上面两个subscribeOn去掉,输出结果是
[code]01-29 19:10:18.627 E/Main: result = 1 Thread[main,5,main] 01-29 19:10:19.627 E/Main: result = 2 Thread[main,5,main] 01-29 19:10:20.428 E/Main: result = 3 Thread[main,5,main] 01-29 19:10:21.231 E/Main: result = 4 Thread[main,5,main]
输出结果结果就变成正序的了
[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences) public static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent)
这个数组上面提到了,merge传的对象最后会组装到一个数组里;maxConcurrent这个是可同时注册的最大subscriber数目,按照传入的Observable依次注册,如果超出这个个数,剩下的会在之前完成后再注册,可以类比成我们的线程池,超出的处于等待状f态,比如上面的例子可以改成
[code] Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer o) { try { Thread.sleep(1000); } catch (Exception e) { } return o; } }); Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.newThread()); Observable.merge(new Observable[]{observable1, observable2}, 1).subscribe(new Action1() { @Override public void call(Object o) { Log.e(TAG, "result = " + o + " " + Thread.currentThread()); } });
我们限定同时可注册的最大个数是1,并且给第一个设置了sleep,输出结果是
[code]01-29 20:23:58.562 E/Main: result = 1 Thread[RxCachedThreadScheduler-1,5,main] 01-29 20:23:59.565 E/Main: result = 2 Thread[RxCachedThreadScheduler-1,5,main] 01-29 20:23:59.568 E/Main: result = 3 Thread[RxNewThreadScheduler-1,5,main] 01-29 20:23:59.568 E/Main: result = 4 Thread[RxNewThreadScheduler-1,5,main]
我们发现虽然是不同线程,但结果按照我们传入顺序依次输出,这就是因为我们限制了同时可注册的最大个数
[code]public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences) public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent)
这个方法和上面的类似,从数组变成了Iterable,就是变成了可以传我们常用的list,set等实现Collection的集合
[code]public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)
我们上面的操作都会走到这个统一的方法内执行,这个很少用到,比如
[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences) { return merge(from(sequences)); }
这个from方法就会把sequences的item转成Observable<? extends Observable<? extends T>>这种格式
concat
[code]public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) public static <T> Observable<T> concat(Observable<? extends T>... t)
以传两个参数的为例
[code]public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2) { return concat(just(t1, t2)); } public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) { return observables.lift(OperatorConcat.<T>instance()); } public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> { ... @Override public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) { final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child); final SerialSubscription current = new SerialSubscription(); child.add(current); ConcatSubscriber<T> cs = new ConcatSubscriber<T>(s, current); ConcatProducer<T> cp = new ConcatProducer<T>(cs); child.setProducer(cp); return cs; } ... }
最终转化的是cs,所以操作也在cs内,最终会通过cs内的OnNext等方法发送给我们定义的child的注册,我们看下这个类
[code]static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T>> { private final Subscriber<T> child; final ConcurrentLinkedQueue<Object> queue; final AtomicInteger wip = new AtomicInteger(); ... public void onNext(Observable<? extends T> t) { queue.add(t); if (wip.getAndIncrement() == 0) { subscribeNext(); } } void subscribeNext() { ... Object o = queue.poll(); if (o != null) { Observable<? extends T> obs = o; currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter); current.set(currentSubscriber); obs.unsafeSubscribe(currentSubscriber); ... } void completeInner() { currentSubscriber = null; if (wip.decrementAndGet() > 0) { subscribeNext(); } request(1); } ... } static class ConcatInnerSubscriber<T> extends Subscriber<T> { .... private final Subscriber<T> child; private final AtomicBoolean once = new AtomicBoolean(); @Override public void onNext(T t) { ... child.onNext(t); } @Override public void onCompleted() { if (once.compareAndSet(false, true)) { ConcatSubscriber<T> p = parent; p.produced(produced); p.completeInner(); } } }
我们从上往下看queue是一个线程安全的先进先出的队列,wip是一个线程安全的技术integer
每次OnNext回调都会把相应的Observable放到这个队列里,这个就是最开始传入的多个Observable对象;
然后会调用subscribeNext方法去注册一个分发的subscriber, 同时把wip的值+1;这样后续回调onNext虽然会往queue里添加新的Observable对象,但不会再次进入subscribeNext方法;
然后在subscribeNext方法里创建新的分发对象,并把数据发送给我们最终要回调的哪个subscribe,数据发送完成会会通知上级通过completeInner方法判断是否继续下一个Observable的分发;这样就保证了遍历按照顺序依次执行
[code]Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer o) { try { Thread.sleep(1000); } catch (Exception e) { } return o; } }); Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.newThread()); Observable.concat(observable1, observable2).subscribe(new Action1() { @Override public void call(Object o) { Log.e(TAG, "result = " + o + " " + Thread.currentThread()); } });
输出结果是
[code]01-29 22:03:34.776 E/Main: result = 1 Thread[RxCachedThreadScheduler-1,5,main] 01-29 22:03:35.779 E/Main: result = 2 Thread[RxCachedThreadScheduler-1,5,main] 01-29 22:03:35.782 E/Main: result = 3 Thread[RxNewThreadScheduler-1,5,main] 01-29 22:03:35.782 E/Main: result = 4 Thread[RxNewThreadScheduler-1,5,main]
可以看出虽然是多个线程,但是结果仍然是按照顺序执行
- 点赞
- 收藏
- 分享
- 文章举报
- Rxjava的基础用法和源码解析(三)
- TagView的用法及源码解析
- 1.2.Spring源码解析——容器的基础XmlBeanFactory
- Rxjava2 源码解析 (三)
- java基础解析系列(十)---ArrayList和LinkedList源码及使用分析
- RxJava2_map操作符源码解析
- 泛型基础用法解析
- 深入Java基础(三)--集合(2)ArrayList和其继承树源码解析以及其注意事项
- GeoServer源码解析和扩展 (一)基础篇
- RxJava2源码解析
- 大数据spark蘑菇云行动前传第5课:零基础彻底实战Scala函数式编程及Spark源码解析
- GeoServer源码解析和扩展 (一)基础篇
- 深入解析RxJava源码(一)Observable对象的构建
- 关于ListView (2)——SimpleAdapter源码解析与ViewBinders的用法
- 解析Java中PriorityQueue优先级队列结构的源码及用法
- Android源码基础解析之Dialog取消绘制流程
- java基础类型源码解析之HashMap
- Java基础之static关键字解析及用法学习总结
- SplashView的用法及源码解析
- GuidePageView的用法及源码解析