RxJava Lift
2016-06-15 23:27
295 查看
RxJava 不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。
例子
上面用了map操作符,map操作符是最常见和常用的,这里看一下他的源码:
map(Func1< ? super T, ? extends R> func)中泛型参数不是很懂,但是了解下Func接口。
RxJava中有一系列Action+数字,Func+数字的接口,这些接口中都只有一个call方法,其中Action接口的call方法都没有返回值,
Func接口的call方法都有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。
其实主要是因为Java中函数不是一等公民,所以只能用接口这么啰嗦的格式,还好我们可以使用lambda简化我们的代码。
这里map方法接收的参数类型为Func1< ? super T, ? extends R> func,表示func的call方法接收一个T类型的参数,返回一个R类型的返回值。
OperatorMap是什么作用呢?
OperatorMap继承自Operator, 而Operator又继承自Func1接口,也就是说Operator接口的call方法会接收一个Subscriber类型的参数,并且返回另外一个Subscriber类型的对象。Operator.call方法返回一个Subscriber对象,其实我们可以这么理解,每一个operator也是一个订阅者,它返回的Subscriber对象正好用来订阅Observable发出来的消息。
有一点需要注意的是OperatorMap和Operator的泛型参数顺序刚好是相反的,为什么要这么做呢?其实很简单,因为Operator本身是对Observable发出的数据进行转换的,所以经常会出现operator转换之后返回的数据类型变了,而OperatorMap这里刚好颠倒了一下顺序,就可以保证call方法返回的Subscriber类型可以订阅Observable发出的数据。
OperatorMap的代码先了解到这里,回到lift方法。
从上面代码可以看到调用了lift函数,然后把我们的转换器传入进去,看下它做了什么事。
简化一下
返回了一个新的Observable对象,这才是重点! 这种链式调用看起来特别熟悉?有没有像javascript中的Promise/A,在then中返回一个Promise对象进行链式调用?
OK,那么我们要看下它是如何工作的啦。
来个流程图:
在map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable2,我们这里调用subscribe,完整的就是Observable2, 我们这里调用subscribe,完整的就是Observable2.subscribe,继续看到subscribe里,重要的几个调用:
注意注意 ! 这里的observable是Observable$2!!也就是说,这里的onSubscribe是,lift中定义的!!
OK,我们追踪下去,回到lift的定义中。
一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个Observable的onSubScribe!而不是Observable$2中的onSubscribe,OK,谨记这一点之后,看看
这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法
没错,对传入的Subscriber做了一个代理,把转换后的值传入。
这样就生成了一个代理的Subscriber,
最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用。。
也就是
然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。
参考:
/article/7583103.html
https://segmentfault.com/a/1190000004049841
http://blog.csdn.net/lzyzsd/article/details/50110355
路漫漫其修远兮,吾将上下而求索!
例子
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); } }) .map(new Func1<String, String>() { @Override public String call(String s) { return s + "word"; } }) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("rx", s); } });
上面用了map操作符,map操作符是最常见和常用的,这里看一下他的源码:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
map(Func1< ? super T, ? extends R> func)中泛型参数不是很懂,但是了解下Func接口。
public interface Func1<T, R> extends Function { R call(T t); }
RxJava中有一系列Action+数字,Func+数字的接口,这些接口中都只有一个call方法,其中Action接口的call方法都没有返回值,
Func接口的call方法都有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。
其实主要是因为Java中函数不是一等公民,所以只能用接口这么啰嗦的格式,还好我们可以使用lambda简化我们的代码。
这里map方法接收的参数类型为Func1< ? super T, ? extends R> func,表示func的call方法接收一个T类型的参数,返回一个R类型的返回值。
OperatorMap是什么作用呢?
public final class OperatorMap<T, R> implements Operator<R, T> public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
OperatorMap继承自Operator, 而Operator又继承自Func1接口,也就是说Operator接口的call方法会接收一个Subscriber类型的参数,并且返回另外一个Subscriber类型的对象。Operator.call方法返回一个Subscriber对象,其实我们可以这么理解,每一个operator也是一个订阅者,它返回的Subscriber对象正好用来订阅Observable发出来的消息。
有一点需要注意的是OperatorMap和Operator的泛型参数顺序刚好是相反的,为什么要这么做呢?其实很简单,因为Operator本身是对Observable发出的数据进行转换的,所以经常会出现operator转换之后返回的数据类型变了,而OperatorMap这里刚好颠倒了一下顺序,就可以保证call方法返回的Subscriber类型可以订阅Observable发出的数据。
OperatorMap的代码先了解到这里,回到lift方法。
从上面代码可以看到调用了lift函数,然后把我们的转换器传入进去,看下它做了什么事。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }
简化一下
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(...); }
返回了一个新的Observable对象,这才是重点! 这种链式调用看起来特别熟悉?有没有像javascript中的Promise/A,在then中返回一个Promise对象进行链式调用?
OK,那么我们要看下它是如何工作的啦。
来个流程图:
在map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable2,我们这里调用subscribe,完整的就是Observable2, 我们这里调用subscribe,完整的就是Observable2.subscribe,继续看到subscribe里,重要的几个调用:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber);
注意注意 ! 这里的observable是Observable$2!!也就是说,这里的onSubscribe是,lift中定义的!!
OK,我们追踪下去,回到lift的定义中。
return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); //请注意我!! 这个onSubscribe是原始的OnSubScribe对象!! } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } });
一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个Observable的onSubScribe!而不是Observable$2中的onSubscribe,OK,谨记这一点之后,看看
Subscriber<? super T> st = hook.onLift(operator).call(o);
这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法
@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new Subscriber<T>(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(OnErrorThrowable.addValueAsLastCause(e, t)); } } }; }
没错,对传入的Subscriber做了一个代理,把转换后的值传入。
这样就生成了一个代理的Subscriber,
最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用。。
也就是
@Override public void call(Subscriber<? super String> subscriber) { //此处的subscriber就是被map包裹(wrapper)后的对象。 subscriber.onNext("hello"); }
然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。
参考:
/article/7583103.html
https://segmentfault.com/a/1190000004049841
http://blog.csdn.net/lzyzsd/article/details/50110355
路漫漫其修远兮,吾将上下而求索!
相关文章推荐
- no leveldbjni64-1.8 in java.library.path
- java 解析 yaml
- 可能是最简单的方式:利用Eclipse创建基于Maven的Web项目
- Java:jdbc连接数据库插入中文数据乱码问题
- 我的Java学习之路
- 【struts1】--国际化
- Spring Boot学习笔记
- 让Spring 3中jsp的数据对象使用懒加载(FetchType.LAZY)与Controller的JSR 303并存
- Java MVC框架性能比较
- JAVA之代码混淆proguard
- Java学习之文件传输基础---Java IO流
- Java 8新特性 Lambde表达式介绍-使用
- 博客第二天-java基础的学习收获
- Java多线程编程(第一章)
- SpringMVC+MyBatis(最新)
- JAVA 中的IO流
- javaweb速查资料,很给力!
- Eclipse项目迁移到Android Studiod的常见错误
- Java语言程序设计 【基础篇】 【学习笔记】
- struts1,struts2,springMVC终极对比