RxJava2.0中map操作符用法和源码分析(四)
2017-09-04 09:07
489 查看
map基本使用
map是变换操作符,对原始Observable发射的每一项数据应用一个你选择的函数生成新的结果,然后返回一个发射这些结果Observable。但从字面上还是比较难以理解,我们可以用代码示例说明:
Observable.just(1,2,3).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "This is new result " + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { println("accept : " + s +"\n"); } }); 输出结果: accept : This is new result 1 accept : This is new result 2 accept : This is new result 3
由上面代码可知,执行map操作时,首先接收原始Observable发射的数据,然后根据你的操作生成新的数据并将这些新的数据发射,这时观察者中接收的就是新生成的数据。
下面我们将从源码的角度来分析下:
这里我们首先使用just操作符创建一个Observable来发射指定的数据。关于just如何创建Observable对象,我们这里不做分析,前面文章中已经说明。这里just创建的具体对象为ObservableFromArray。我们直接分析map的源码,我们看到在调用map方法时,我们需要传入一个Function的对象:
/** * A functional interface that takes a value and returns another value, possibly with a * different type and allows throwing a checked exception. * * @param <T> the input value type * @param <R> the output value type */ public interface Function<T, R> { /** * Apply some calculation to the input value and return some other value. * @param t the input value * @return the output value * @throws Exception on error */ @NonNull R apply(@NonNull T t) throws Exception; }
如上所述,这个接口的功能主要是接收一个值(T),然后返回另一个值(R)。
我们在查看map的方法:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
与之前其它操作符一样的调用逻辑,将当前的Observable对象和生成的Function对象作为参数,生成一个ObservableMap的对象。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } ...... }
完成了Observable对象初始化后,我们开始订阅观察者。这里我们选择使用的观察者为Consumer对象。订阅观察者时,执行subscribe方法:
Observable#subscribe
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
同样的,将在subscribe方法中执行subscribeActual(observer)方法:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
这里其实执行的是ObservableMap中的subscribeActual方法。在subscribeActual方法中,首先会创建一个MapObserver对象,参数t对应的是LambdaObserver。
然后执行source.subscribe方法,source代表的是之前的Observable对象,也就是just创建的ObservableFromArray对象,所以再次调用Observable中subscribe方法,执行subscribeActual,而这次执行的对象是ObservableFromArray,而参数observer具体实现是MapObserver的对象:
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } ...... }
到这里执行的逻辑与之前分析fromArray操作符用法相同,不再做具体分析。在FromArrayDisposable中具体执行的run方法中:
FromArrayDisposable#run
void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } }
如上此时actual是由MapObserver实现的,我们看下MapObserver的onNext方法:
MapObserver#onNext:
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); }
当调用onNext方法时,通过Function接口回调apply方法获得转换后的数据,然后再通过 actual.onNext(v)方法发射出去。此时的actual中的onNext方法就可以接收新的参数了,而actual就是之前初始化的LambdaObserver对象。通过它可以让Consumer的accept方法中接收该数据。这里的执行逻辑之前已经分析过,这里不再详述。
相关文章推荐
- RxJava2.0中flatMap操作符用法和源码分析(五)
- RxJava2.0中create操作符用法和源码分析
- [RxJava学习]操作符flatMap源码分析
- RxJava create操作符的用法和源码分析
- RxJava(一) create操作符的用法和源码分析
- RxJava2.0中fromArray操作符用法和源码分析(三)
- Android RxJava(一) create操作符的用法和源码分析
- RxJava进阶之源码分析map() 操作符分析
- [RxJava学习]操作符Map源码分析
- RxJava2.0中just操作符用法和源码分析(二)
- jQuery插件之jQuery.Form.js用法实例分析(附demo示例源码)
- ArrayList用法详解与源码分析
- Java遍历时删除List、Set、Map中的元素(源码分析)
- Python原始字符串与Unicode字符串操作符用法实例分析
- Map子类重要源码分析对比&HashMap&HashTable&TreeMap&IdentityHashMap&WeakHashMap
- libevent源码分析:evmap_io_active_函数
- Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表
- Okhttp使用和源码分析二(OkHttp3.x用法)
- Libevent源码分析-----event_signal_map
- Go语言map字典用法实例分析