Android函数响应式编程——必学的RxJava辅助操作符delay、Do、subscribeOn、observeOn、timeout
2017-12-10 01:52
453 查看
1.delay:让被观察者2秒后再发送数据。
输出
12-09 08:30:55.853 21368-21419/com.hdu.a15058124.homework3 I/xbh: 2001
2.Do:doOnNext,代表着在执行onNext的时候调用。
输出
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:1
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:1
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:2
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:2
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onCompleted:
补充
doOnEach 被观察者每发送一次数据,都会调用一次,整个的onNext,onError,onComplete。
doOnSubscribe 当观察者订阅被观察者的时候调用
doOnUnsubscribe 当观察者取消订阅被观察者的时候调用;Observable通过onError或者onCompleted结束,会取消订阅所有的Subscriber,也会调用
doOnNext
doOnCompleted
doOnError
doOnTerminate 当Observable终止(无论是正常终止还是异常终止)之前会被调用。
finallyDo 当Observable终止(无论是正常终止还是异常终止)之后会被调用。
3.subscribeOn、observeOn:前者代表在被观察者中的逻辑在哪个线程执行。后者代表观察者观察到后的逻辑在哪个线程执行。
输出
12-09 09:01:42.882 17059-17111/com.hdu.a15058124.homework3 I/xbh: RxNewThreadScheduler-1
12-09 09:01:42.901 17059-17059/com.hdu.a15058124.homework3 I/xbh: main
4.timeout:给定200ms,你不能在发射数据后在接下来200ms内不再发送任何信息,如果没有,将会以onError的形式终止,或者执行你备用的Observable。我们这里执行的是备用的Observable。
输出
12-09 09:12:53.171 27171-27171/? I/xbh: 0
12-09 09:12:53.273 27171-27171/? I/xbh: 1
12-09 09:12:53.475 27171-27171/? I/xbh: 2
12-09 09:12:53.677 27171-27208/? I/xbh: 10
12-09 09:12:53.677 27171-27208/? I/xbh: 11
rx.Observable.create(new rx.Observable.OnSubscribe<Long>() { @Override public void call(Subscriber<? super Long> subscriber) { Long cur = System.currentTimeMillis(); subscriber.onNext(cur); } }).delay(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Long cur = System.currentTimeMillis(); Log.i("xbh", cur - aLong + ""); } });
输出
12-09 08:30:55.853 21368-21419/com.hdu.a15058124.homework3 I/xbh: 2001
2.Do:doOnNext,代表着在执行onNext的时候调用。
rx.Observable.just(1,2) .doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", "call:" + integer); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.i("xbh", "onCompleted: "); } @Override public void onError(Throwable e) { Log.i("xbh", "onError: "); } @Override public void onNext(Integer integer) { Log.i("xbh", "onNext:" + integer); } });
输出
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:1
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:1
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: call:2
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onNext:2
12-09 08:44:22.491 1324-1324/com.hdu.a15058124.homework3 I/xbh: onCompleted:
补充
doOnEach 被观察者每发送一次数据,都会调用一次,整个的onNext,onError,onComplete。
doOnSubscribe 当观察者订阅被观察者的时候调用
doOnUnsubscribe 当观察者取消订阅被观察者的时候调用;Observable通过onError或者onCompleted结束,会取消订阅所有的Subscriber,也会调用
doOnNext
doOnCompleted
doOnError
doOnTerminate 当Observable终止(无论是正常终止还是异常终止)之前会被调用。
finallyDo 当Observable终止(无论是正常终止还是异常终止)之后会被调用。
3.subscribeOn、observeOn:前者代表在被观察者中的逻辑在哪个线程执行。后者代表观察者观察到后的逻辑在哪个线程执行。
rx.Observable<Integer> obs = rx.Observable.create(new rx.Observable.OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> subscriber) { Log.i("xbh", Thread.currentThread().getName() + ""); subscriber.onNext(1); } }); obs.subscribeOn(Schedulers.newThread()).observeOn (AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", Thread.currentThread().getName()); } });
输出
12-09 09:01:42.882 17059-17111/com.hdu.a15058124.homework3 I/xbh: RxNewThreadScheduler-1
12-09 09:01:42.901 17059-17059/com.hdu.a15058124.homework3 I/xbh: main
4.timeout:给定200ms,你不能在发射数据后在接下来200ms内不再发送任何信息,如果没有,将会以onError的形式终止,或者执行你备用的Observable。我们这里执行的是备用的Observable。
rx.Observable<Integer> obs = rx.Observable.create(new rx.Observable.OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 4; i ++) { try { Thread.sleep(i * 100); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } } }).timeout(200, TimeUnit.MILLISECONDS, rx.Observable.just(10,11)); obs.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } });
输出
12-09 09:12:53.171 27171-27171/? I/xbh: 0
12-09 09:12:53.273 27171-27171/? I/xbh: 1
12-09 09:12:53.475 27171-27171/? I/xbh: 2
12-09 09:12:53.677 27171-27208/? I/xbh: 10
12-09 09:12:53.677 27171-27208/? I/xbh: 11
相关文章推荐
- RX操作符之辅助操作一(materialize、dematerialize、timestamp、serialize、replay、observeOn、subscribeon、dooneach)
- RX操作之辅助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)
- Android函数响应式编程——必学的RxJava过滤操作符filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst
- Android函数响应式编程——必学的RxJava组合操作符startWith、merge、concat、zip、combineLastest
- 7.6 Utility 辅助操作 - Delay/Do/ObserveOn/SubscribeOn/Subscribe
- RxJava操作符-subscribeOn
- Android函数响应式编程最新RxJava-操作符入门(2)
- RxJava2操作符之“Delay”
- RxJava与网络相关的操作符(range/defer/retry/repeat/timer/delay/interval/BehaviorSubject/zip)
- Android函数响应式编程——必学的RxJava错误处理操作符catch、retry
- RxJava2:observeOn和subscribeOn的使用
- Android函数响应式编程——必学的RxJava条件操作符和布尔操作符all、contains、isEmpty、amb、defaultIfEmpty
- RxJava的辅助操作符实例
- Android函数响应式编程——必学的RxJava转换操作符toList、toSortedList、toMap
- RxJava之辅助操作符
- RxJava操作符——辅助操作符(Observable Utility Operators)
- [RxJava学习]操作符Do的使用
- RxJava之五—— observeOn()与subscribeOn()的详解
- RxJava2的do系列操作符之doOnNext和doFinally
- Rxjava中的observeOn和subscribeOn