您的位置:首页 > 移动开发 > Android开发

Android函数响应式编程——必学的RxJava辅助操作符delay、Do、subscribeOn、observeOn、timeout

2017-12-10 01:52 453 查看
1.delay:让被观察者2秒后再发送数据。

rx.Observable.create(new rx.Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
Long cur = System.currentTimeMillis();
subscriber.onNext(cur);
}
}).delay(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Long cur = System.currentTimeMillis();
Log.i("xbh", cur - aLong + "");
}
});


输出

12-09 08:30:55.853 21368-21419/com.hdu.a15058124.homework3 I/xbh: 2001

2.Do:doOnNext,代表着在执行onNext的时候调用。

rx.Observable.just(1,2)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("xbh", "call:" + integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.i("xbh", "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.i("xbh", "onError: ");
}

@Override
public void onNext(Integer integer) {
Log.i("xbh", "onNext:" + integer);
}
});


输出

12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:1

12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:1

12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:2

12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:2

12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onCompleted: 

补充

doOnEach 被观察者每发送一次数据,都会调用一次,整个的onNext,onError,onComplete。

doOnSubscribe 当观察者订阅被观察者的时候调用

doOnUnsubscribe 当观察者取消订阅被观察者的时候调用;Observable通过onError或者onCompleted结束,会取消订阅所有的Subscriber,也会调用

doOnNext

doOnCompleted

doOnError

doOnTerminate 当Observable终止(无论是正常终止还是异常终止)之前会被调用。

finallyDo 当Observable终止(无论是正常终止还是异常终止)之后会被调用。

3.subscribeOn、observeOn:前者代表在被观察者中的逻辑在哪个线程执行。后者代表观察者观察到后的逻辑在哪个线程执行。

rx.Observable<Integer> obs = rx.Observable.create(new rx.Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("xbh", Thread.currentThread().getName() + "");
subscriber.onNext(1);
}
});

obs.subscribeOn(Schedulers.newThread()).observeOn
(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("xbh", Thread.currentThread().getName());
}
});


输出

12-09 09:01:42.882 17059-17111/com.hdu.a15058124.homework3 I/xbh: RxNewThreadScheduler-1

12-09 09:01:42.901 17059-17059/com.hdu.a15058124.homework3 I/xbh: main

4.timeout:给定200ms,你不能在发射数据后在接下来200ms内不再发送任何信息,如果没有,将会以onError的形式终止,或者执行你备用的Observable。我们这里执行的是备用的Observable。

rx.Observable<Integer> obs = rx.Observable.create(new rx.Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 4; i ++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
}
}).timeout(200, TimeUnit.MILLISECONDS, rx.Observable.just(10,11));
obs.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("xbh", integer + "");
}
});


输出

12-09 09:12:53.171 27171-27171/? I/xbh: 0

12-09 09:12:53.273 27171-27171/? I/xbh: 1

12-09 09:12:53.475 27171-27171/? I/xbh: 2

12-09 09:12:53.677 27171-27208/? I/xbh: 10

12-09 09:12:53.677 27171-27208/? I/xbh: 11
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: