RX操作符之辅助操作一(materialize、dematerialize、timestamp、serialize、replay、observeOn、subscribeon、dooneach)
2016-08-09 18:09
441 查看
一、materialize
运行结果:
二、dematerialize
运行结果:
三、timeStamp
四、serialize
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG,"call.................unsubscribed");
}
})
.serialize().subscribe(subscriber);
运行结果:
五、replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。可连接的Observable
(connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了
final ConnectableObservable<Long> observable = Observable.interval(1,TimeUnit.SECONDS).observeOn(Schedulers.newThread())
.take(5).replay(3);
final Subscriber<Long> subscriber1 = new Subscriber<Long>() {
@Override
public void onNext(Long v) {
Log.e(TAG,"onNext1................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
Subscriber<Long> subscriber = new Subscriber<Long>() {
@Override
public void onNext(Long l) {
Log.e(TAG,"onNext................."+l);
if(l == 3){
observable.subscribe(subscriber1);
}
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
observable.connect();
运行结果:
六、observeOn
指定一个观察者在哪个调度器上观察这个Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
}).observeOn(Schedulers.io());
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
运行结果:
七、subscribeon
指定Observable自身在哪个调度器上执行
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
Log.e(TAG, "call................."+ Thread.currentThread().getName());
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread());
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted................."+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError....................."+Thread.currentThread().getName());
}
};
observable.subscribe(subscriber);运行结果:
八、doOnEach
发射完一个Observable时的回调
Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnEach(new Action1<rx.Notification<? super Integer>>() {
@Override
public void call(rx.Notification<? super Integer> notification) {
int i = (int) notification.getValue();
Log.e(TAG, "发射了一个数据....................."+i);
}
})
.subscribe(subscriber);
运行结果:
九、doOnNext
Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
if(integer == 3){
Log.e(TAG, "doOnNext.....................");
}
}
})
.subscribe(subscriber);运行结果:
十、doOnSubscribe
订阅生成时的回调
运行结果:
Materialize将数据项和事件通知都当做数据项发射,
Dematerialize刚好相反。一个合法的有限的Obversable将调用它的观察者的
onNext方法零次或多次,然后调用观察者的
onCompleted或
onError正好一次。
Materialize操作符将这一系列调用,包括原来的
onNext通知和终止通知
onCompleted或
onError都转换为一个Observable发射的数据序列。
materialize将来自原始Observable的通知转换为
Notification对象,然后它返回的Observable会发射这些数据。
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i + ""); try { Thread.sleep(1000); } catch (Exception e) { } } subscriber.onCompleted(); } }); Subscriber<Object> subscriber = new Subscriber<Object>() { @Override public void onNext(Object v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable.materialize().subscribe(subscriber);
运行结果:
二、dematerialize
Dematerialize操作符是
Materialize的逆向过程,它将
Materialize转换的结果还原成它原本的形式。
ematerialize反转这个过程,将原始Observable发射的
Notification对象还原成Observable的通知。
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i + ""); try { Thread.sleep(1000); } catch (Exception e) { } } subscriber.onCompleted(); } }); Subscriber<Object> subscriber = new Subscriber<Object>() { @Override public void onNext(Object v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable.materialize().dematerialize().subscribe(subscriber);
运行结果:
三、timeStamp
timestamp发射T类型数据的Observable转换为一个发射类型为
Timestamped<T>的数据的Observable,每一项都包含数据的原始发射时间。
String[] items = {"timeStamp1","timeStamp2","timeStamp3","timeStamp4","timeStamp5"}; Observable<String> observable = Observable.from(items); Subscriber<Object> subscriber = new Subscriber<Object>() { @Override public void onNext(Object v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable.timestamp().subscribe(subscriber);运行结果:
四、serialize
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个
onNext调用之前尝试调用
onCompleted或
onError方法,或者从两个不同的线程同时调用
onNext方法。使用
Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG,"call.................unsubscribed");
}
})
.serialize().subscribe(subscriber);
运行结果:
五、replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。可连接的Observable
(connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了
Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。如果在将一个Observable转换为可连接的Observable之前对它使用
Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。
final ConnectableObservable<Long> observable = Observable.interval(1,TimeUnit.SECONDS).observeOn(Schedulers.newThread())
.take(5).replay(3);
final Subscriber<Long> subscriber1 = new Subscriber<Long>() {
@Override
public void onNext(Long v) {
Log.e(TAG,"onNext1................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
Subscriber<Long> subscriber = new Subscriber<Long>() {
@Override
public void onNext(Long l) {
Log.e(TAG,"onNext................."+l);
if(l == 3){
observable.subscribe(subscriber1);
}
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
observable.connect();
运行结果:
六、observeOn
指定一个观察者在哪个调度器上观察这个Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
}).observeOn(Schedulers.io());
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
运行结果:
七、subscribeon
指定Observable自身在哪个调度器上执行
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i + "");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
Log.e(TAG, "call................."+ Thread.currentThread().getName());
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread());
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onNext(Object v) {
Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted................."+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError....................."+Thread.currentThread().getName());
}
};
observable.subscribe(subscriber);运行结果:
八、doOnEach
发射完一个Observable时的回调
Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnEach(new Action1<rx.Notification<? super Integer>>() {
@Override
public void call(rx.Notification<? super Integer> notification) {
int i = (int) notification.getValue();
Log.e(TAG, "发射了一个数据....................."+i);
}
})
.subscribe(subscriber);
运行结果:
九、doOnNext
doOnNext操作符类似于
doOnEach(Action1),但是它的Action不是接受一个
Notification参数,而是接受发射的数据项。
Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
if(integer == 3){
Log.e(TAG, "doOnNext.....................");
}
}
})
.subscribe(subscriber);运行结果:
十、doOnSubscribe
订阅生成时的回调
Observable<Integer> observable = Observable.just(1,2,3,4,5,6); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable .doOnSubscribe(new Action0() { @Override public void call() { Log.e(TAG, "观察者订阅了它生成的Observable....................."); } }) .subscribe(subscriber);
运行结果:
相关文章推荐
- RX操作之辅助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)
- Android函数响应式编程——必学的RxJava辅助操作符delay、Do、subscribeOn、observeOn、timeout
- 7.6 Utility 辅助操作 - Delay/Do/ObserveOn/SubscribeOn/Subscribe
- RX操作符之结合操作(startWith、merge、mergeDelayError、zip、join、groupjoin、switchOnNext)
- RX操作符之连接操作ConnectObservable(publish、connect、replay、refCount)
- RX操作符之对Observable发射的数据执行变换操作二(scan、groupby、buffer、window)
- RX操作符之过滤操作二(firstOrDefault、single、elementAt、sample、throttleFirst、throwttleLast、throttleWithTimeout)
- RxJava操作符(07-辅助操作)
- RX操作符之算术和聚合操作(averageInteger、min、max、count、sum、contact、reduce、collect)
- RX操作符之过滤操作一(debounce、filter、ofType、takeLast、last、lastOrDefault、takeLastBuffer、skip、skipLast、take)
- RX操作之条件和布操作符(amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil、takeWhile、contains、all、exists、isEmpty)
- RX操作符之对Observable发射的数据执行变换操作一(map、flatmap)
- RX操作符之辅助操作符三(using、getIterator、toFuture、toIterator、toMap、toMultiMap、nest)
- RxJava操作符(07-辅助操作)
- Ruby on Rails 数据库Migration操作语句实例
- Critical Functional Bug on cannot rebuild files which the updates files do not exist was fixed.
- Why do I lose ASP Session State on IIS6
- $(document).ready(function(){$('#id').each(function(){//do something代码});});
- WCF分布式开发常见错误(14):无效的操作异常,At least one operation on the ...
- VS2005 Winform 控制绑定操作新特性 DataSourceUpdateMode.OnPropertyChanged