RxJava(十一)defer操作符实现代码支持链式调用
2016-09-20 16:52
381 查看
欢迎转载,转载请标明出处:
http://blog.csdn.net/johnny901114/article/details/52597643
本文出自:【余志强的博客】
RxJava系列文章目录导读:
一、RxJava create操作符的用法和源码分析
二、RxJava map操作符用法详解
三、RxJava flatMap操作符用法详解
四、RxJava concatMap操作符用法详解
五、RxJava onErrorResumeNext操作符实现app与服务器间token机制
六、RxJava retryWhen操作符实现错误重试机制
七、RxJava 使用debounce操作符优化app搜索功能
八、RxJava concat操作处理多数据源
九、RxJava zip操作符在Android中的实际使用场景
十、RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断
十一、RxJava defer操作符实现代码支持链式调用
十二、combineLatest操作符的高级使用
十三、RxJava导致Fragment Activity内存泄漏问题
十四、interval、takeWhile操作符实现获取验证码功能
1,轻松切换线程。以前我们切换线程主要使用Handler等手段来做。
2,轻松解决回调的嵌套问题。现在的app业务逻辑越来越复杂,多的时候3,4层回调嵌套,使得代码可维护性变得很差。RxJava链式调用使得这些调用变得扁平化。
随着RxJava的流行,越来越多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。
但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是一些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。
如果还是想以前那样做,那就太low。
如下代码片段:
这样确实没有没有问题。但是我们要封装下, 每个方法都这样写维护性和扩展比较差(例如有天我想换种方式来实现而不是create,如果通过方法封装一下,修改就变得容易多了)
如何封装呢?通过分析知道,大部分代码是相同的,只是我们的业务不一样。那么通过模板方法解决吧。业务方法通过接口回调的方式传递进来,因为我们不知道调用者是什么业务。
回调接口如下(T表示我们业务数据):
下面是模板代码:
使用就非常简单了调用createObservable方法,实现MyCallable接口即可,然后就是跟使用RxJava一样处理逻辑。
最终的实现也是通过dao.load(key)同步方法来实现的,关键是wrap方法了:
通过代码可以看到默认执行的线程是IO线程:
所以使用greendao不用指定它在IO执行,因为框架已经帮我们设置了。
然后就是
上面的注释说通过Observable.fromCallable也可以实现这样的逻辑,也就是说代替Observable.defer()方法。
最后greendao是通过defer操作符来实现rx风格的。
我们对defer操作符比较陌生,先看看它的源码:
说白了就是调用了
其实我们上面的create操作也是调用过来这个方法。只是defer操作符传递的OnSubscribe是OnSubscribeDefer,那我们来看看这是什么鬼?
OnSubscribeDefer也是继承自OnSubscribe,那么他的call方法肯定也是在订阅的时候被调用(就是说订阅的时候才创建这个observable,并且每次订阅都会创建一个新的observable)。
为什么Greendao没有使用create那种方式精确控制数据的发射?现在RxJava2.0对create操作符做出了一些限制,不能随随便便create了,这样出现一些问题。具体的RxJava2.0的改动可以看看
他的github说明What’s-different-in-2.0
subscribe vs unsafeSubscribe
What’s-different-in-2.0
本文的例子放在github上 https://github.com/chiclaim/android-sample/tree/master/rxjava
http://blog.csdn.net/johnny901114/article/details/52597643
本文出自:【余志强的博客】
RxJava系列文章目录导读:
一、RxJava create操作符的用法和源码分析
二、RxJava map操作符用法详解
三、RxJava flatMap操作符用法详解
四、RxJava concatMap操作符用法详解
五、RxJava onErrorResumeNext操作符实现app与服务器间token机制
六、RxJava retryWhen操作符实现错误重试机制
七、RxJava 使用debounce操作符优化app搜索功能
八、RxJava concat操作处理多数据源
九、RxJava zip操作符在Android中的实际使用场景
十、RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断
十一、RxJava defer操作符实现代码支持链式调用
十二、combineLatest操作符的高级使用
十三、RxJava导致Fragment Activity内存泄漏问题
十四、interval、takeWhile操作符实现获取验证码功能
一、前言
现在越来越多Android开发者使用到RxJava,在Android使用RxJava主要有如下好处:1,轻松切换线程。以前我们切换线程主要使用Handler等手段来做。
2,轻松解决回调的嵌套问题。现在的app业务逻辑越来越复杂,多的时候3,4层回调嵌套,使得代码可维护性变得很差。RxJava链式调用使得这些调用变得扁平化。
随着RxJava的流行,越来越多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。
但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是一些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。
如果还是想以前那样做,那就太low。
二、下面就来探讨下如何使得代码支持RxJava风格
遇到这种问题,在我脑海里浮现的第一种方式就是通过Observable的create操作符。因为在里面我们可以控制数据的发射。就像上一篇文章那样《RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断》如下代码片段:
Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { try { List<Article> as = articleDao.queryBuilder() .where(ArticleDao.Properties.CategoryId.eq(categoryId)) .orderDesc(ArticleDao.Properties.Id) .offset((pageIndex - 1) * pageSize) .limit(pageSize).list(); if (as == null || as.isEmpty()) { subscriber.onNext(null); }else{ subscriber.onNext(as); } }catch (Exception e){ subscriber.onError(e); } subscriber.onCompleted(); } });
这样确实没有没有问题。但是我们要封装下, 每个方法都这样写维护性和扩展比较差(例如有天我想换种方式来实现而不是create,如果通过方法封装一下,修改就变得容易多了)
如何封装呢?通过分析知道,大部分代码是相同的,只是我们的业务不一样。那么通过模板方法解决吧。业务方法通过接口回调的方式传递进来,因为我们不知道调用者是什么业务。
回调接口如下(T表示我们业务数据):
public interface MyCallable<T> { T call(); }
下面是模板代码:
protected <R> Observable<R> createObservable(final MyCallable<R> callable) { return Observable.create(new Observable.OnSubscribe<R>() { @Override public void call(Subscriber<? super R> subscriber) { try { R result = callable.call(); subscriber.onNext(result); } catch (Exception e) { subscriber.onError(e); } subscriber.onCompleted(); } }); }
使用就非常简单了调用createObservable方法,实现MyCallable接口即可,然后就是跟使用RxJava一样处理逻辑。
三、分析greendao是如何支持RxJava风格的
看过Greendao源码的人知道,它也是通过这种方式支持RxJava的(下面看看他是怎么做的):/** * Rx version of {@link AbstractDao#loadAll()} returning an Observable. */ @Experimental public Observable<T> load(final K key) { return wrap(new Callable<T>() { @Override public T call() throws Exception { return dao.load(key); } }); }
最终的实现也是通过dao.load(key)同步方法来实现的,关键是wrap方法了:
protected <R> Observable<R> wrap(Callable<R> callable) { return wrap(RxUtils.fromCallable(callable)); } //通过这个方法再包装了一层(就是默认设置执行的线程) protected <R> Observable<R> wrap(Observable<R> observable) { if (scheduler != null) { return observable.subscribeOn(scheduler); } else { return observable; } }
通过代码可以看到默认执行的线程是IO线程:
/** * The returned RxDao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for * subscribeOn. * * @see #rxPlain() */ @Experimental public RxDao<T, K> rx() { if (rxDao == null) { rxDao = new RxDao<>(this, Schedulers.io()); } return rxDao; }
所以使用greendao不用指定它在IO执行,因为框架已经帮我们设置了。
然后就是
RxUtils.fromCallable(callable)方法了:
class RxUtils { /** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */ @Internal static <T> Observable<T> fromCallable(final Callable<T> callable) { return Observable.defer(new Func0<Observable<T>>() { @Override public Observable<T> call() { T result; try { result = callable.call(); } catch (Exception e) { return Observable.error(e); } return Observable.just(result); } }); } }
上面的注释说通过Observable.fromCallable也可以实现这样的逻辑,也就是说代替Observable.defer()方法。
最后greendao是通过defer操作符来实现rx风格的。
四、defer和create操作符有什么异同点?
通过分析greendao源码得知,他是通过defer来做的,我们是通过create操作符来做的。那两者有什么不同?我们对defer操作符比较陌生,先看看它的源码:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) { return create(new OnSubscribeDefer<T>(observableFactory)); }
说白了就是调用了
create(OnSubscribe<T> f)方法:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }
其实我们上面的create操作也是调用过来这个方法。只是defer操作符传递的OnSubscribe是OnSubscribeDefer,那我们来看看这是什么鬼?
public final class OnSubscribeDefer<T> implements OnSubscribe<T> { final Func0<? extends Observable<? extends T>> observableFactory; public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) { this.observableFactory = observableFactory; } @Override public void call(final Subscriber<? super T> s) { Observable<? extends T> o; try { o = observableFactory.call(); } catch (Throwable t) { Exceptions.throwOrReport(t, s); return; } o.unsafeSubscribe(Subscribers.wrap(s)); } }
OnSubscribeDefer也是继承自OnSubscribe,那么他的call方法肯定也是在订阅的时候被调用(就是说订阅的时候才创建这个observable,并且每次订阅都会创建一个新的observable)。
为什么Greendao没有使用create那种方式精确控制数据的发射?现在RxJava2.0对create操作符做出了一些限制,不能随随便便create了,这样出现一些问题。具体的RxJava2.0的改动可以看看
他的github说明What’s-different-in-2.0
五、参考资料:
pitfalls-of-operator-implementationssubscribe vs unsafeSubscribe
What’s-different-in-2.0
本文的例子放在github上 https://github.com/chiclaim/android-sample/tree/master/rxjava
相关文章推荐
- RxJava(十一)defer操作符实现代码支持链式调用
- RxJava(十一)defer操作符实现代码支持链式调用
- Scala 深入浅出实战经典 第51讲:Scala中链式调用风格的实现代码实战及其在Spark中应用
- 第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用学习笔记
- 第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用
- (4.6.22.1)来吧,是时候撸一份自己的RxJava框架啦:观察者模式实现链式调用
- Javascript 链式调用实现代码(参考jquery)
- Scala深入浅出进阶经典 第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用
- 纯JavaScript实现简易版的jQuery选择器,支持链式调用
- 51.Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用
- Retrofit+Rxjava实现嵌套逻辑的链式调用
- Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用之Scala学习笔记-41
- JavaScript中两种链式调用实现代码
- Javascript 链式调用实现代码(参考jquery)
- 发布NBearLite v1.0.0 beta - 全面支持SqlServer,Oracle,MySql,PostgreSql数据库存储过程调用代码生成(C#/VB.NET)
- 实现调用外部EXE程序的代码
- [已封装] DIV拖动类 支持在FF下拖动,调用简单 主要代码都有注释 适合初学者学习
- VFP调用API实现代码模拟鼠标.键盘动作
- 带保护的远程调用代码框架,宏实现