您的位置:首页 > 编程语言 > Java开发

RxJava中的Subject和常见的生命周期管理

2016-07-14 20:14 585 查看
首先,从下面这个对Subject类的定义中可以看出Subject既可以是Observer也可以是Observable。

public abstract class Subject<T, R> extends Observable<R> implements Observer<T>


这种就非常适合于把数据先存入到Subject中,然后转成Observable这样就可以发射事件流了,常见的有:

1.PublishSubject

当一个数据发射到 PublishSubject 中时,PublishSubject 将立刻把这个数据发射到订阅到该 subject 上的所有 subscriber 中。

subjects.onNext(1);
subjects.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
subjects.onNext(2);
subjects.onNext(3);


subjects是一个PublichSubject对象,输出的结果是2和3,其中1并没有输出因为subscribe发生在onNext(1)之后且PublichSubject是立即发射的原因,所以接收不到。

2.ReplaySubject

ReplaySubject 可以缓存所有发射给他的数据。当一个新的订阅者订阅的时候,缓存的所有数据都会发射给这个订阅者。 由于使用了缓存,所以每个订阅者都会收到所以的数据:

replaySubject.onNext(1);
replaySubject.onNext(2);
replaySubject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("replaySubject = " + integer);
}
});
replaySubject.onNext(3);


我们可以看到虽然subscribe发生在1和2之后但是这个结果仍然能输出123三个值。ReplaySubject虽然有缓存能力,但是Observable的存在时长是不可估计的,如果缓存所有的数据那么就会造成内存开销过大引起不必要的麻烦,所以这里有两种办法:

ReplaySubject< Integer > s = ReplaySubject.createWithSize(5);
ReplaySubject< Integer > s = ReplaySubject.createWithTime(500, TimeUnit.MILLISECONDS,                                         Schedulers.immediate());

ReplaySubject< Integer > s =  ReplaySubject.createWithTimeAndSize(...);


分别表示的是最多缓存数量以及缓存最长时间、缓存数量和时间三种方式来优化缓存的Subject。

3.BehaviorSubject

BehaviorSubject 只发射最后一个值,等同于限制 ReplaySubject 的个数为 1 的情况。常见的做法是给他初始化一个值,这样subscribe马上就要值了。

s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {System.out.println("behaviorSubject = " + integer);
}
});


这样输出的结果就是2,因为只发射最后一个值。

4.AsyncSubject

缓存最后一个数据,不过区别于其他三个它必须onCompleted调用之后才会发射最后的值。

s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();


以上四个就是常见的Subject,很多地方可以用的到,比如说著名的RxBus项目。

RxJava生命周期管理

常见的Observer或者Subscriber都有三个方法,分别用来接收Observable发射的onNext、onComplete、onError。

private Observer<Boolean> getObserver() {
return new Observer<Boolean>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Boolean aBoolean) {
}
};
}


然后通过Observable.subscribe 来发生订阅关系

Subscription    subscribe()
Subscription    subscribe(Action1<? super T> onNext)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super T> observer)
Subscription    subscribe(Subscriber<? super T> subscriber)


我们可以看到各式各样的subscribe方法不过每一个方法都返回Subscription,没错我们就是取Subscription来进行管理的。

boolean isUnsubscribed()
void unsubscribe()


由于Observable的生命存活时间无法确定,所以如果一个Activity结束了它不一定结束。所以我们需要通过unsubscribe()方法来却小之间的subscribe,可以通过isUnsubscribed()来判断是否已经unsubscribe过了。

所以基本上完整的代码是:

public void closeSubscription() {
if (null != mSubscription && !mSubscription.isUnsubscribed()) {
mSubscription.unsubscribe();
}
}


如果说一个界面有很多个Observable那么每次都定义一个Subscription肯定不靠谱,所有就有这么一个类CompositeSubscription专门用来存放Subscription的容器。

// 声明一个CompositeSubscription
public CompositeSubscription mCompositeSubscription = new CompositeSubscription();

// 将Subscription加入到CompositeSubscription中
public void addSubscription(Subscription subscription) {
if (null != mCompositeSubscription) {
mCompositeSubscription.add(subscription);
}
}


当然肯定要在Activity等的生命周期结束类似OnDestory方法中一次性unsubscribe操作

private void unsubscribe() {
if (null != mCompositeSubscription && !mCompositeSubscription.isUnsubscribed()) {
mCompositeSubscription.unsubscribe();
}
}


总之,在使用RxJava的时候一定需要主要unsubscribe操作,免得发生不可控制的后果。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息