RX操作符之过滤操作二(firstOrDefault、single、elementAt、sample、throttleFirst、throwttleLast、throttleWithTimeout)
2016-08-07 12:28
489 查看
一、firstOrDefault
传递一个谓词函数给
二、takeFirst
三、single
如果是无参的函数,发射多个observable则会抛出异常,执行onError()方法,如果只发射一个值会正常执行。
1.正常执行:
运行结果:
2.运行出错:
运行结果:
四、elementAt
只发射第N项数据,
五、elementAtOrDefault
六、sample
定期发射Observable最近发射的数据项,
运行结果:
代码中每隔1s发射一次数据,共5次,订阅时3s订阅一次,于是只取出了第3s发射的数据的值2
七、throttleFirst
运行结果:
八、throwttleLast
运行结果:
九、throttleWithTimeout
根据指定的时间间隔进行限流,时间单位通过
运行结果:
十、timeout
对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
运行结果:
十一、distinct
抑制(过滤掉)重复的数据项
运行结果:
十二、distinctUntilChanged
过滤掉连续重复的数据
运行结果:
十三、ignoreElements
如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用
运行结果:
传递一个谓词函数给
firstOrDefault,然后发射这个函数判定为
true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。
Integer[]items = {6,7,81,2,3,4,5}; Observable myObservable = Observable.from(items) .firstOrDefault(88, new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer >85; } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println("onNext................."+integer); } @Override public void onCompleted() { System.out.println("onCompleted................."); } @Override public void onError(Throwable e) { System.out.println("onError...................."); } }; myObservable.subscribe(mySubscriber);运行结果:
二、takeFirst
takeFirst与
first类似,除了这一点:如果原始Observable没有发射任何满足条件的数据,
first会抛出一个
NoSuchElementException,
takeFist会返回一个空的Observable(不调用
onNext()但是会调用
onCompleted)。
三、single
如果是无参的函数,发射多个observable则会抛出异常,执行onError()方法,如果只发射一个值会正常执行。
1.正常执行:
Integer[]items = {6}; Observable myObservable = Observable.from(items) .single(); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println("onNext................."+integer); } @Override public void onCompleted() { System.out.println("onCompleted................."); } @Override public void onError(Throwable e) { System.out.println("onError...................."); } }; myObservable.subscribe(mySubscriber);
运行结果:
2.运行出错:
Integer[]items = {6,7,81,2,3,4,5}; Observable myObservable = Observable.from(items) .single(); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println("onNext................."+integer); } @Override public void onCompleted() { System.out.println("onCompleted................."); } @Override public void onError(Throwable e) { System.out.println("onError...................."); } }; myObservable.subscribe(mySubscriber);
运行结果:
四、elementAt
只发射第N项数据,
ElementAt操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
Integer[]items = {6,7,81,2,3,4,5}; Observable myObservable = Observable.from(items) .elementAt(5); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println("onNext................."+integer); } @Override public void onCompleted() { System.out.println("onCompleted................."); } @Override public void onError(Throwable e) { System.out.println("onError...................."); } }; myObservable.subscribe(mySubscriber);运行结果:
五、elementAtOrDefault
elementAtOrDefault操作符与
elementAt的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个
IndexOutOfBoundsException异常。
六、sample
定期发射Observable最近发射的数据项,
Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。
Integer[]items = {6,7,81,2,3,4,5}; Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i=0;i<5;i++){ subscriber.onNext(i); try{ Thread.sleep(1000); }catch (Exception e){ } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println("onNext................."+integer); } @Override public void onCompleted() { System.out.println("onCompleted................."); } @Override public void onError(Throwable e) { System.out.println("onError...................."); } }; myObservable.sample(3,TimeUnit.SECONDS).subscribe(mySubscriber);
运行结果:
代码中每隔1s发射一次数据,共5次,订阅时3s订阅一次,于是只取出了第3s发射的数据的值2
七、throttleFirst
throttleFirst与
throttleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。
Integer[]items = {6,7,81,2,3,4,5}; Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i=0;i<5;i++){ subscriber.onNext(i); try{ Thread.sleep(1000); }catch (Exception e){ } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.throttleFirst(2, TimeUnit.SECONDS).subscribe(mySubscriber);
运行结果:
八、throwttleLast
throttleFirst在每个采样周期内,它总是发射原始Observable的最后一项数据,而不是最近的一项。
Integer[]items = {6,7,81,2,3,4,5}; Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i=0;i<5;i++){ subscriber.onNext(i); try{ Thread.sleep(1000); }catch (Exception e){ } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.throttleLast(2, TimeUnit.SECONDS).subscribe(mySubscriber);
运行结果:
九、throttleWithTimeout
根据指定的时间间隔进行限流,时间单位通过
TimeUnit参数指定。这种操作符默认在
computation调度器上执行,但是你可以通过第三个参数指定。
Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
myObservable.throttleWithTimeout(2, TimeUnit.SECONDS).subscribe(mySubscriber);
}
private void testThrottleLast() {
Integer[]items = {6,7,81,2,3,4,5}; Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i=0;i<5;i++){ subscriber.onNext(i); try{ Thread.sleep(1000); }catch (Exception e){ } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.throttleLast(2, TimeUnit.SECONDS).subscribe(mySubscriber);
运行结果:
十、timeout
对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); try { Thread.sleep(100*i); } catch (Exception e) { } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.timeout(300, TimeUnit.MILLISECONDS).subscribe(mySubscriber);
运行结果:
十一、distinct
抑制(过滤掉)重复的数据项
Integer[]items = {6,6,6,6,7,8,2,2,2,3,4,5}; Observable<Integer> myObservable = Observable.from(items); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.distinct().subscribe(mySubscriber);
运行结果:
十二、distinctUntilChanged
过滤掉连续重复的数据
Integer[]items = {6,6,5,6,6,7,8,2,2,2,3,4,5}; Observable<Integer> myObservable = Observable.from(items); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.distinctUntilChanged().subscribe(mySubscriber);
运行结果:
十三、ignoreElements
IgnoreElements操作符抑制原始Observable发射的所有数据,只允许它的终止通知(
onError或
onCompleted)通过。
如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用
ignoreElements操作符,它会确保永远不会调用观察者的
onNext()方法。
Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); try { Thread.sleep(100*i); } catch (Exception e) { } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.ignoreElements().subscribe(mySubscriber);
运行结果:
相关文章推荐
- 7、步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Count、LongCount、Sum、Min、Max、Average、Aggregate、Cast、DefaultIfEmpty、SequenceEqual、OfType、ToArray、ToList、ToDictionary
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- 步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- 步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Count
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Coun
- LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Count 等
- 步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault、ElementAt、ElementAtOrDefault、Contains、Any、All、Count、LongCount、Sum、Min、Max、Average、Aggregate、Cast、DefaultIfEmpty、SequenceEqual、OfType、ToArray、ToList、ToDictionary
- RX操作符之过滤操作一(debounce、filter、ofType、takeLast、last、lastOrDefault、takeLastBuffer、skip、skipLast、take)
- RxJava 过滤操作符 first last single
- 步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault
- RxJava【过滤】操作符 filter distinct throttle take skip first
- RX操作符之结合操作(startWith、merge、mergeDelayError、zip、join、groupjoin、switchOnNext)
- Android函数响应式编程——必学的RxJava过滤操作符filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst
- 上接步步为营VS 2008 + .NET 3.5(7) - LINQ查询操作符之First、FirstOrDefault、Last、LastOrDefault
- Rx_java(7)Rx_java2操作符(throttleFirst、debounce )介绍-防止按钮被重复点击案例
- 自己动手重新实现LINQ to Objects: 11 - First,Last,Single以及它们带有OrDefault的重载
- LINQ中First,FirstOrDefault,Single,SingleOrDefault的区别