您的位置:首页 > 其它

RX操作符之过滤操作二(firstOrDefault、single、elementAt、sample、throttleFirst、throwttleLast、throttleWithTimeout)

2016-08-07 12:28 489 查看
一、firstOrDefault
传递一个谓词函数给
firstOrDefault
,然后发射这个函数判定为
true
的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。

Integer[]items = {6,7,81,2,3,4,5};
Observable myObservable = Observable.from(items)
.firstOrDefault(88, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer >85;
}
});
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext................."+integer);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);
运行结果:



二、takeFirst
takeFirst
first
类似,除了这一点:如果原始Observable没有发射任何满足条件的数据,
first
会抛出一个
NoSuchElementException
takeFist
会返回一个空的Observable(不调用
onNext()
但是会调用
onCompleted
)。

三、single
如果是无参的函数,发射多个observable则会抛出异常,执行onError()方法,如果只发射一个值会正常执行。

1.正常执行:

Integer[]items = {6};
Observable myObservable = Observable.from(items)
.single();
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext................."+integer);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);


运行结果:



2.运行出错:

Integer[]items = {6,7,81,2,3,4,5};
Observable myObservable = Observable.from(items)
.single();
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext................."+integer);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);


运行结果:



四、elementAt
只发射第N项数据,
ElementAt
操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

Integer[]items = {6,7,81,2,3,4,5};
Observable myObservable = Observable.from(items)
.elementAt(5);
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext................."+integer);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);
运行结果:



五、elementAtOrDefault
elementAtOrDefault
操作符与
elementAt
的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个
IndexOutOfBoundsException
异常。

六、sample
定期发射Observable最近发射的数据项,
Sample
操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。

Integer[]items = {6,7,81,2,3,4,5};

Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
try{
Thread.sleep(1000);
}catch (Exception e){

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext................."+integer);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.sample(3,TimeUnit.SECONDS).subscribe(mySubscriber);


运行结果:

代码中每隔1s发射一次数据,共5次,订阅时3s订阅一次,于是只取出了第3s发射的数据的值2



七、throttleFirst

throttleFirst
throttleLast/sample
不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。

Integer[]items = {6,7,81,2,3,4,5};

Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
try{
Thread.sleep(1000);
}catch (Exception e){

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.throttleFirst(2, TimeUnit.SECONDS).subscribe(mySubscriber);


运行结果:



八、throwttleLast
throttleFirst
在每个采样周期内,它总是发射原始Observable的最后一项数据,而不是最近的一项。

Integer[]items = {6,7,81,2,3,4,5};

Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
try{
Thread.sleep(1000);
}catch (Exception e){

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.throttleLast(2, TimeUnit.SECONDS).subscribe(mySubscriber);


运行结果:



九、throttleWithTimeout
根据指定的时间间隔进行限流,时间单位通过
TimeUnit
参数指定。这种操作符默认在
computation
调度器上执行,但是你可以通过第三个参数指定。

 Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.throttleWithTimeout(2, TimeUnit.SECONDS).subscribe(mySubscriber);
}

private void testThrottleLast() {
Integer[]items = {6,7,81,2,3,4,5}; Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i=0;i<5;i++){ subscriber.onNext(i); try{ Thread.sleep(1000); }catch (Exception e){ } } subscriber.onCompleted(); } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.throttleLast(2, TimeUnit.SECONDS).subscribe(mySubscriber);


运行结果:



十、timeout

对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知

Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100*i);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.timeout(300, TimeUnit.MILLISECONDS).subscribe(mySubscriber);


运行结果:



十一、distinct
抑制(过滤掉)重复的数据项

Integer[]items = {6,6,6,6,7,8,2,2,2,3,4,5};

Observable<Integer> myObservable =  Observable.from(items);

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.distinct().subscribe(mySubscriber);


运行结果:



十二、distinctUntilChanged
过滤掉连续重复的数据

Integer[]items = {6,6,5,6,6,7,8,2,2,2,3,4,5};

Observable<Integer> myObservable =  Observable.from(items);

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.distinctUntilChanged().subscribe(mySubscriber);


运行结果:



十三、ignoreElements

IgnoreElements
操作符抑制原始Observable发射的所有数据,只允许它的终止通知(
onError
onCompleted
)通过。

如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用
ignoreElements
操作符,它会确保永远不会调用观察者的
onNext()
方法。

Observable<Integer> myObservable =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100*i);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.ignoreElements().subscribe(mySubscriber);


运行结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐