RxJava1.x从入门到放弃再到RxJava 2.x(一)
2017-04-19 23:30
453 查看
从2016.3出来实习的时候,就听到这个流行的框架,那时候偶尔看看博客的介绍,没深入学习与使用,故而什么也没学到。如今借助公司项目重构这个机会,在框架上使用到了RxJava。所以,算是一个再学习的机会吧。由于RxJava2.0已经发布了大半年了,我们就直接学习RxJava 2.0啦,go ,go ,go。
上面的水管是事件的发射源,就叫他上游吧,下面的水管是事件的接收源,叫他下游好了。两根水管通过一定的方式连接起来,使得上游每产生一个事件,下游就能收到该事件。
这里的上游和下游对应着RxJava的Observable和Observer,他们之间的连接就对应着subscribe(),因此这个关系用RxJava来表示就是:
运行结果:
注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.
改用优雅的写法:
有使用过RxJava 1.x的朋友们应该注意到了这边写法有些不一样了,多出了2个陌生的东西:ObservableEmitter和Disposable 。ObservableEmitter,按字面上的意思就是事件发射器,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
需要注意的规则如下:
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。
上游可以不发送onComplete或onError。
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
结果如下图:
结果程序crash了,如下图:
介绍了ObservableEmitter, 接下来介绍Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
结果如下图:
到了3后就被断流了,下游没有收到上游发射的onComplete信号, 而且上游并没有因为发送了onComplete而停止. 同时可以看到下游的onSubscribe()方法是最先调用的.
另外,subscibe()有多个重载方法:
这里简单说明下:
不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧
对于只有一个参数,参数类型是Con
a681
sumer的表示下游只关心onNext()事件。例如:
其他几个同理,不解释
这次就先学习到这里吧。
什么是RxJava
RxJava本质是一个异步操作库,是一个能让你用及其简洁的逻辑去处理繁琐复杂任务的异步事件库。RxJava的优势
说到异步,Android上已经提供了Handler,AsyncTask等异步操作的类库,那为什么还要选择RxJava呢?答案是“简洁”。RxJava可以用非常简洁理论基础
记得最开始学习RxJava的时候,看的大部分博客是从观察者模式开始,先讲观察者,被观察者,订阅关系巴拉巴拉一大堆,笔记也跟着做了不少,学习成本很大。今天呢,借用之前看过的一篇文章讲解的思路,用两根水管代替观察者和被观察者, 试图用通俗易懂的话把它们的关系解释清楚,从事件流这个角度来说明RxJava的基本工作原理。上面的水管是事件的发射源,就叫他上游吧,下面的水管是事件的接收源,叫他下游好了。两根水管通过一定的方式连接起来,使得上游每产生一个事件,下游就能收到该事件。
这里的上游和下游对应着RxJava的Observable和Observer,他们之间的连接就对应着subscribe(),因此这个关系用RxJava来表示就是:
//上游 Observable<Integer> upstream=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); } }); //下游 Observer<Integer> downstream =new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG,"onNext:"+value); } @Override public void onError(Throwable e) { Log.d(TAG,"onError"); } @Override public void onComplete() { Log.d(TAG,"onComplete"); } }; //通过subscribe,连接起来 upstream.subscribe(downstream);
运行结果:
注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.
改用优雅的写法:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext:" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
有使用过RxJava 1.x的朋友们应该注意到了这边写法有些不一样了,多出了2个陌生的东西:ObservableEmitter和Disposable 。ObservableEmitter,按字面上的意思就是事件发射器,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
需要注意的规则如下:
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。
上游可以不发送onComplete或onError。
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
// 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件. Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); Log.d(TAG, "emitter.onNext(1)"); emitter.onNext(2); Log.d(TAG, "emitter.onNext(2)"); emitter.onNext(3); Log.d(TAG, "emitter.onNext(3)"); emitter.onNext(4); Log.d(TAG, "emitter.onNext(4)"); emitter.onComplete(); emitter.onNext(5); Log.d(TAG, "emitter.onNext(5)"); emitter.onNext(6); Log.d(TAG, "emitter.onNext(6)"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext:" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
结果如下图:
//不能先发一个onComplete, 然后再发一个onError Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); emitter.onError(new Throwable("error")); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext:" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
结果程序crash了,如下图:
介绍了ObservableEmitter, 接下来介绍Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emitter.onNext(1)"); emitter.onNext(1); Log.d(TAG, "emitter.onNext(2)"); emitter.onNext(2); Log.d(TAG, "emitter.onNext(3)"); emitter.onNext(3); Log.d(TAG, "emitter.onNext(4)"); emitter.onNext(4); Log.d(TAG, "emitter.onComplete()"); emitter.onComplete(); Log.d(TAG, "emitter.onNext(5)"); emitter.onNext(5); Log.d(TAG, "emitter.onNext(6)"); emitter.onNext(6); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { mDisposable = d; Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext:" + value); if(value==3) { Log.d(TAG, "dispose"); mDisposable.dispose(); Log.d(TAG, "isDispose:"+mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
结果如下图:
到了3后就被断流了,下游没有收到上游发射的onComplete信号, 而且上游并没有因为发送了onComplete而停止. 同时可以看到下游的onSubscribe()方法是最先调用的.
另外,subscibe()有多个重载方法:
这里简单说明下:
不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧
对于只有一个参数,参数类型是Con
a681
sumer的表示下游只关心onNext()事件。例如:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emitter.onNext(1)"); emitter.onNext(1); Log.d(TAG, "emitter.onNext(2)"); emitter.onNext(2); Log.d(TAG, "emitter.onNext(3)"); emitter.onNext(3); Log.d(TAG, "emitter.onNext(4)"); emitter.onNext(4); Log.d(TAG, "emitter.onComplete()"); emitter.onComplete(); Log.d(TAG, "emitter.onNext(5)"); emitter.onNext(5); Log.d(TAG, "emitter.onNext(6)"); emitter.onNext(6); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG,"accept:"+integer); } });
其他几个同理,不解释
这次就先学习到这里吧。
相关文章推荐
- RxJava1.x从入门到放弃再到RxJava 2.x(二)
- RxJava1.x从入门到放弃再到RxJava 2.x(四)
- RxJava从入门到放弃1.0--rxjava的简单使用
- RxJava 从入门到放弃再到不离不弃
- RxJava2.0 从入门到放弃?
- RxJava1.x从入门到放弃再到RxJava 2.x(三)
- RxJava从入门到放弃---关于RxJava-入门必看
- android之RxJava的学习,从浅到深,从入门到别放弃(二)
- android之RxJava的学习,从浅到深,从入门到别放弃(一)
- 4000 RxJava从入门到放弃---关于RxJava-入门必看
- Python基础理论知识 Python从入门到放弃系列
- RxJava系列第一弹——RxJava入门篇
- Android插件化:从入门到放弃
- UnityShader从入门到放弃(五)漫反射—逐片元光照
- Web前端从入门到放弃( 固定顶部菜单)
- 9、Redis从入门到放弃 之 Jedis
- Web前端从入门到放弃(js函数声明和调用、有参函数和无参函数、有返回值)
- Docker 从入门到放弃(一)
- Pickle(泡菜) Python从入门到放弃
- OpenCV从入门到放弃(二):架构和上手