您的位置:首页 > 其它

RX操作符之辅助操作一(materialize、dematerialize、timestamp、serialize、replay、observeOn、subscribeon、dooneach)

2016-08-09 18:09 441 查看
一、materialize
Materialize
将数据项和事件通知都当做数据项发射,
Dematerialize
刚好相反。一个合法的有限的Obversable将调用它的观察者的
onNext
方法零次或多次,然后调用观察者的
onCompleted
onError
正好一次。
Materialize
操作符将这一系列调用,包括原来的
onNext
通知和终止通知
onCompleted
onError
都转换为一个Observable发射的数据序列。
materialize
将来自原始Observable的通知转换为
Notification
对象,然后它返回的Observable会发射这些数据。

Observable<String> observable =  Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Subscriber<Object> subscriber = new Subscriber<Object>() {

@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable.materialize().subscribe(subscriber);


运行结果:



二、dematerialize
Dematerialize
操作符是
Materialize
的逆向过程,它将
Materialize
转换的结果还原成它原本的形式。

ematerialize
反转这个过程,将原始Observable发射的
Notification
对象还原成Observable的通知。

Observable<String> observable =  Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Subscriber<Object> subscriber = new Subscriber<Object>() {

@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable.materialize().dematerialize().subscribe(subscriber);


运行结果:



三、timeStamp
timestamp
发射T类型数据的Observable转换为一个发射类型为
Timestamped<T>
的数据的Observable,每一项都包含数据的原始发射时间。

String[] items = {"timeStamp1","timeStamp2","timeStamp3","timeStamp4","timeStamp5"};
Observable<String> observable =  Observable.from(items);
Subscriber<Object> subscriber = new Subscriber<Object>() {

@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable.timestamp().subscribe(subscriber);
运行结果:



四、serialize
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个
onNext
调用之前尝试调用
onCompleted
onError
方法,或者从两个不同的线程同时调用
onNext
方法。使用
Serialize
操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
});

Subscriber<Integer> subscriber = new Subscriber<Integer>() {

@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG,"call.................unsubscribed");
}
})
.serialize().subscribe(subscriber);

运行结果:



五、replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。可连接的Observable
(connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了
Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。如果在将一个Observable转换为可连接的Observable之前对它使用
Replay
操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。

final ConnectableObservable<Long> observable = Observable.interval(1,TimeUnit.SECONDS).observeOn(Schedulers.newThread())
.take(5).replay(3);

final Subscriber<Long> subscriber1 = new Subscriber<Long>() {

@Override
public void onNext(Long v) {
Log.e(TAG,"onNext1................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};

Subscriber<Long> subscriber = new Subscriber<Long>() {

@Override
public void onNext(Long l) {
Log.e(TAG,"onNext................."+l);
if(l == 3){
observable.subscribe(subscriber1);
}
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable.subscribe(subscriber);
observable.connect();

运行结果:



六、observeOn
指定一个观察者在哪个调度器上观察这个Observable

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
}).observeOn(Schedulers.io());

Subscriber<Object> subscriber = new Subscriber<Object>() {

@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable.subscribe(subscriber);

运行结果:



七、subscribeon
指定Observable自身在哪个调度器上执行

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {

}
Log.e(TAG, "call................."+ Thread.currentThread().getName());
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread());

Subscriber<Object> subscriber = new Subscriber<Object>() {

@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted................."+Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError....................."+Thread.currentThread().getName());
}
};

observable.subscribe(subscriber);运行结果:



八、doOnEach
发射完一个Observable时的回调

Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {

@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable
.doOnEach(new Action1<rx.Notification<? super Integer>>() {
@Override
public void call(rx.Notification<? super Integer> notification) {
int i = (int) notification.getValue();
Log.e(TAG, "发射了一个数据....................."+i);
}
})
.subscribe(subscriber);

运行结果:



九、doOnNext
doOnNext
操作符类似于
doOnEach(Action1)
,但是它的Action不是接受一个
Notification
参数,而是接受发射的数据项。

Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {

@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
if(integer == 3){
Log.e(TAG, "doOnNext.....................");
}
}
})
.subscribe(subscriber);运行结果:



十、doOnSubscribe

订阅生成时的回调
Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {

@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

observable
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "观察者订阅了它生成的Observable.....................");
}
})
.subscribe(subscriber);


运行结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐