您的位置:首页 > 编程语言 > Java开发

RxJava1.x从入门到放弃再到RxJava 2.x(一)

2017-04-19 23:30 453 查看
从2016.3出来实习的时候,就听到这个流行的框架,那时候偶尔看看博客的介绍,没深入学习与使用,故而什么也没学到。如今借助公司项目重构这个机会,在框架上使用到了RxJava。所以,算是一个再学习的机会吧。由于RxJava2.0已经发布了大半年了,我们就直接学习RxJava 2.0啦,go ,go ,go。



什么是RxJava

RxJava本质是一个异步操作库,是一个能让你用及其简洁的逻辑去处理繁琐复杂任务的异步事件库。

RxJava的优势

说到异步,Android上已经提供了Handler,AsyncTask等异步操作的类库,那为什么还要选择RxJava呢?答案是“简洁”。RxJava可以用非常简洁

理论基础

记得最开始学习RxJava的时候,看的大部分博客是从观察者模式开始,先讲观察者,被观察者,订阅关系巴拉巴拉一大堆,笔记也跟着做了不少,学习成本很大。今天呢,借用之前看过的一篇文章讲解的思路,用两根水管代替观察者和被观察者, 试图用通俗易懂的话把它们的关系解释清楚,从事件流这个角度来说明RxJava的基本工作原理。



上面的水管是事件的发射源,就叫他上游吧,下面的水管是事件的接收源,叫他下游好了。两根水管通过一定的方式连接起来,使得上游每产生一个事件,下游就能收到该事件。

这里的上游和下游对应着RxJava的Observable和Observer,他们之间的连接就对应着subscribe(),因此这个关系用RxJava来表示就是:

//上游
Observable<Integer>  upstream=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
});

//下游
Observer<Integer>  downstream =new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG,"onNext:"+value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG,"onError");
}

@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
};

//通过subscribe,连接起来
upstream.subscribe(downstream);


运行结果:



注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.

改用优雅的写法:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext:" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


有使用过RxJava 1.x的朋友们应该注意到了这边写法有些不一样了,多出了2个陌生的东西:ObservableEmitter和Disposable 。ObservableEmitter,按字面上的意思就是事件发射器,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

需要注意的规则如下:

当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。

当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。

上游可以不发送onComplete或onError。

最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

// 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Log.d(TAG, "emitter.onNext(1)");
emitter.onNext(2);
Log.d(TAG, "emitter.onNext(2)");
emitter.onNext(3);
Log.d(TAG, "emitter.onNext(3)");
emitter.onNext(4);
Log.d(TAG, "emitter.onNext(4)");
emitter.onComplete();
emitter.onNext(5);
Log.d(TAG, "emitter.onNext(5)");
emitter.onNext(6);
Log.d(TAG, "emitter.onNext(6)");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext:" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


结果如下图:



//不能先发一个onComplete, 然后再发一个onError
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
emitter.onError(new Throwable("error"));

}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext:" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


结果程序crash了,如下图:



介绍了ObservableEmitter, 接下来介绍Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter.onNext(1)");
emitter.onNext(1);
Log.d(TAG, "emitter.onNext(2)");
emitter.onNext(2);
Log.d(TAG, "emitter.onNext(3)");
emitter.onNext(3);
Log.d(TAG, "emitter.onNext(4)");
emitter.onNext(4);
Log.d(TAG, "emitter.onComplete()");
emitter.onComplete();
Log.d(TAG, "emitter.onNext(5)");
emitter.onNext(5);
Log.d(TAG, "emitter.onNext(6)");
emitter.onNext(6);

}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext:" + value);
if(value==3) {
Log.d(TAG, "dispose");
mDisposable.dispose();
Log.d(TAG, "isDispose:"+mDisposable.isDisposed());
}
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


结果如下图:



到了3后就被断流了,下游没有收到上游发射的onComplete信号, 而且上游并没有因为发送了onComplete而停止. 同时可以看到下游的onSubscribe()方法是最先调用的.

另外,subscibe()有多个重载方法:



这里简单说明下:

不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧

对于只有一个参数,参数类型是Con
a681
sumer的表示下游只关心onNext()事件。例如:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter.onNext(1)");
emitter.onNext(1);
Log.d(TAG, "emitter.onNext(2)");
emitter.onNext(2);
Log.d(TAG, "emitter.onNext(3)");
emitter.onNext(3);
Log.d(TAG, "emitter.onNext(4)");
emitter.onNext(4);
Log.d(TAG, "emitter.onComplete()");
emitter.onComplete();
Log.d(TAG, "emitter.onNext(5)");
emitter.onNext(5);
Log.d(TAG, "emitter.onNext(6)");
emitter.onNext(6);

}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,"accept:"+integer);
}
});




其他几个同理,不解释

这次就先学习到这里吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: