您的位置:首页 > 编程语言 > Java开发

RxJava 学习笔记(七) --- Filtering 过滤操作

2016-07-12 15:51 399 查看
Filter 只发射通过了谓词测试的数据项

OfType ofType是filter操作符的一个特殊形式它过滤一个Observable只返回指定类型的数据

Take 只发射开始的N项数据

TakeLast 只发射最后N个元素

TakeLastBuffer 将最后的N项数据当做单个数据发射

Skip 跳过开始的N项数据

SkipLast 跳过后面的N项数据

Distinct 过滤掉重复数据

DistinctUntilChanged 过滤掉连续重复的数据

ElementAt 发射第N项数据

ElementAtOrDefault 发射第N项数据如果索引值大于数据项数它会发射一个默认值通过额外的参数指定

First 只发射第一项数据

TakeFirst 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目

Single single操作符也与first类似

Last 只发射最后一项或者满足某个条件的最后一项数据

Sample 定期发射Observable最近发射的数据项

ThrottleFirst throttleFirst与throttleLastsample不同在每个采样周期内它总是发射原始Observable的第一项数据而不是最近的一项

ThrottleWithTimeout  or Debounce  只有当Observable在指定的时间后还没有发射数据时才发射一个数据

Timeout 如果在一个指定的时间段后还没发射数据就发射一个异常

IgnoreElements 丢弃所有的正常数据只发射错误或完成通知

1.
Filter
—> 只发射通过了谓词测试的数据项

Filter
操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。



示例代码

Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1
Next: 2
Next: 3
Sequence complete.


filter
默认不在任何特定的调度器上执行。

Javadoc:filter(Func1)

2.
OfType
—> ofType是filter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

ofType
默认不在任何特定的调度器上指定。



示例代码:

Observable.just(1,"sb",0.1f).ofType(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});


输出:

sb


Javadoc:ofType(Class)

3.
Take
—> 只发射开始的N项数据



使用
Take
操作符让你可以修改
Observable
的行为,只返回前面的
N
项数据,然后发射完成通知,忽略剩余的数据。



如果你对一个
Observable
使用
take(n)
(或它的同义词
limit(n)
)操作符,而那个
Observable
发射的数据少于N项,那么take操作生成的
Observable
不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。

示例代码:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.


take(int)
默认不任何特定的调度器上执行。

Javadoc:take(int)

take
的这个变体接受一个时长而不是数量参数。它会丢发射
Observable
开始的那段时间发射的数据,时长和时间单位通过参数指定。



take
的这个变体默认在
computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。

Javadoc:take(long,TimeUnit)

Javadoc:take(long,TimeUnit,Scheduler)

4.
TakeLast
—> 只发射最后N个元素

takeLast
操作符是把源
Observable
产生的结果的后
n
项提交给订阅者,提交时机是
Observable
发布
onCompleted
通知之时。



示例代码:

Observable.just(1,2,3,4,5,6,7).takeLast(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 6
Next: 7
Sequence complete.


Javadoc:takeLast(int)

5.
TakeLastBuffer
—> 将最后的N项数据当做单个数据发射

它和
takeLast
类似,,唯一的不同是它把所有的数据项收集到一个
List
再发射,而不是依次发射一个。



示例代码:

Observable.just(1,2,3,4).takeLastBuffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {

String s = "";
for(Integer str : integers){
s = s +str +",";
}

System.out.println(s);
}
});


输出:

I/System.out: 3,4,


Javadoc:takeLastBuffer(int)

Javadoc:takeLastBuffer(long,TimeUnit)

Javadoc:takeLastBuffer(long,TimeUnit,Scheduler)

Javadoc:takeLastBuffer(int,long,TimeUnit)

Javadoc:takeLastBuffer(int,long,TimeUnit,Scheduler)

6.
Skip
—> 跳过开始的N项数据

抑制
Observable
发射的前
N
项数据



使用
Skip
操作符,你可以忽略
Observable
发射的前
N
项数据,只保留之后的数据。



skip
的这个变体默认不在任何特定的调度器上执行。

示例代码

Observable.just(1,2,3,4).skip(1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出

2
3
4


Javadoc: skip(int)



skip
的这个变体接受一个时长而不是数量参数。它会丢弃原始
Observable
开始的那段时间发射的数据,时长和时间单位通过参数指定。

skip
的这个变体默认在
computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。

Javadoc: skip(long,TimeUnit)

Javadoc: skip(long,TimeUnit,Scheduler)

7.
SkipLast
—> 跳过后面的N项数据

抑制
Observable
发射的后
N
项数据



使用
SkipLast
操作符修改原始
Observable
,你可以忽略
Observable
发射的后
N
项数据,只保留前面的数据。



使用
SkipLast
操作符,你可以忽略原始
Observable
发射的后
N
项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始
Observable
发射的任何数据项,直到它发射了
N
项数据。

示例代码:

Observable.just(1,2,3,4).skipLast(1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出

1
2
3


skipLast
的这个变体默认不在任何特定的调度器上执行。

Javadoc:skipLast(int)



还有一个
skipLast
变体接受一个时长而不是数量参数。它会丢弃在原始
Observable
的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意:这个机制是这样实现的:延迟原始
Observable
发射的任何数据项,直到自这次发射之后过了给定的时长。

skipLast
的这个变体默认在
computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。

Javadoc:skipLast(long,TimeUnit)

Javadoc:skipLast(long,TimeUnit,Scheduler)

8.
Distinct
—> 过滤掉重复数据

Distinct
的过滤规则是:只允许还没有发射过的数据项通过。



在某些实现中,有一些变体允许你调整判定两个数据不同(
distinct
)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。

distinct()



示例代码:

Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出

Next: 1
Next: 2
Next: 3
Sequence complete.


Javadoc:distinct()

distinct(Func1)



这个操作符有一个变体接受一个函数。这个函数根据原始
Observable
发射的数据项产生一个
Key
,然后,比较这些
Key
而不是数据本身,来判定两个数据是否是不同的。

示例代码:

Observable.just(1,2,3,4,5,6).distinct(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer%3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出: 1% 3= 1 , 2%3 =2 ,3%3 = 0 , 4%3 = 1 , 5%3=2 ,6%3 = 0 ,后面三个和前面三个的值重复去掉

I/System.out: 1
I/System.out: 2
I/System.out: 3


Javadoc:distinct(Func1)

9.
DistinctUntilChanged
—> 过滤掉连续重复的数据

DistinctUntilChanged()



示例代码:

Observable.just(1,2,2,2,5,6).distinctUntilChanged().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出:

1
2
5
6


DistinctUntilChanged(Func1)

distinct(Func1)
一样,根据一个函数产生的
Key
判定两个相邻的数据项是不是不同的。



示例代码

Observable.just(1,2,2,2,5,11).distinctUntilChanged(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return integer %2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出

1
2
5


distinct
distinctUntilChanged
默认不在任何特定的调度器上执行。

Javadoc:distinctUntilChanged(Func1)

*

10.
ElementAt
—> 发射第N项数据



ElementAt
操作符获取原始
Observable
发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。



RxJava
将这个操作符实现为
elementAt
,给它传递一个基于
0
的索引值,它会发射原始
Observable
数据序列对应索引位置的值,如果你传递给
elementAt
的值为
5
,那么它会发射第
项的数据。

如果你传递的是一个
负数
,或者原始
Observable
的数据项数小于
index+1
,将会抛出一个
IndexOutOfBoundsException
异常。

示例代码:

Observable.just(1,2,3,4,5,6).elementAt(2)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});


输出:

Next:3
completed!


Javadoc: elementAt(int)

11.
ElementAtOrDefault
—> 发射第N项数据,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定)

RxJava
还实现了
elementAtOrDefault
操作符。与
elementAt
的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个
IndexOutOfBoundsException
异常。



示例代码:

Observable.just(1,2,3,4,5,6).elementAtOrDefault(13,999).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出:

999


elementA
t和
elementAtOrDefault
默认不在任何特定的调度器上执行。

Javadoc:Javadoc: elementAtOrDefault(int,T)

12.
First
—> 只发射第一项数据

如果你只对
Observable
发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用
First
操作符。

在某些实现中,
First
没有实现为一个返回
Observable
的过滤操作符,而是实现为一个在当时就发射原始
Observable
指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用
Take(1)
或者
ElementAt(0)


在一些实现中还有一个
Single
操作符。它的行为与
First
类似,但为了确保只发射单个值,它会等待原始
Observable
终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始
Observable
获取第一项数据,而且也确保只发射一项数据。

RxJava
中,这个操作符被实现为first,
firstOrDefault
takeFirst


可能容易混淆,
BlockingObservable
也有名叫
first
firstOrDefault
的操作符,它们会阻塞并返回值,不是立即返回一个
Observable


还有几个其它的操作符执行类似的功能。

First()



示例代码:

Observable.just(1, 2, 3)
.first()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1
Sequence complete.


Java:first()

First(Func1)

传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。



示例代码:

Observable.just(1,2,3,4,5,6).first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出

4


Javadoc:first(Func1)

FirstOrDefault(T)

firstOrDefault
first
类似,但是在
Observable
没有发射任何数据时发射一个你在参数中指定的默认值。



示例代码:

Observable.empty().firstOrDefault("fuck you").subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o+"");
}
});


输出

fuck you


Javadoc:firstOrDefault(T)

FirstOrDefault(T, Func1)

传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。



示例代码:

Observable.just(1,2,3,4,5,6).firstOrDefault(99, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer == 4;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出:

4


Javadoc:firstOrDefault(T, Func1)

13.
TakeFirst
—> 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目。

takeFirst
操作符类似于
take
操作符,同时也类似于
first
操作符,都是获取源
Observable
产生的结果列表中符合指定条件的前一个或多个,与
first
操作符不同的是,
first
操作符如果获取不到数据,则会抛出
NoSuchElementException
异常,而
takeFirst
则会返回一个空的
Observable
,该
Observable
只有
onCompleted
通知而没有
onNext
通知。



示例代码:

Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//获取数值大于3的数据
return integer>3;
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出

Next: 4
Sequence complete.


Javadoc:takeFirst(Func1)

14.
Single
—> single操作符也与first类似

Single()

single
操作符也与first类似,但是如果原始
Observable
在完成之前不是正好发射一次数据,它会抛出一个
NoSuchElementException




示例代码:

Observable.just(1,2).single().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("===============>"+integer+"");
}
});


输出

rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements


Javadoc:single()

Single(Func1)

single
的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。



示例代码:

Observable.just(1,2,3,4,5,6).single(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>5; // 输出值为6
return integer>3; // 报错 Sequence contains too many elements
return integer>6; // 报错 Sequence contains no elements
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});


输出:

return integer>5; // 输出值为6
return integer>3; // 报错 Sequence contains too many elements
return integer>6; // 报错 Sequence contains no elements


Javadoc:single(Func1)

singleOrDefault(T)

firstOrDefault
类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。



示例代码:

Observable.just("1","233").singleOrDefault("fuck you two").subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});


输出:

rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements


Javadoc:singleOrDefault(T)

singleOrDefault(T,Func1))

firstOrDefault(T, Func1)
类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。



示例代码

Observable.just(1,2,3,4,5,6,7,8).singleOrDefault(666, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer s) {
return s>4;       //rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
return s>12;      // 666
return s>7;       // 8
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});


Javadoc:singleOrDefault(Func1,T)

15.
Last
—> 只发射最后一项(或者满足某个条件的最后一项)数据



如果你只对
Observable
发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用
Last
操作符。

在某些实现中,
Last
没有实现为一个返回
Observable
的过滤操作符,而是实现为一个在当时就发射原始
Observable
指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)。

RxJava
中的实现是
last
lastOrDefault


可能容易混淆,
BlockingObservable
也有名叫
last
lastOrDefault
的操作符,它们会阻塞并返回值,不是立即返回一个
Observable


last()



只发射最后一项数据,使用没有参数的last操作符。

示例代码:

Observable.just(1, 2, 3)
.last()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 3
Sequence complete.


Javadoc:just()

last(Func1)



这个版本的
last
也是接受一个谓词函数,返回一个发射原始
Observable
中满足条件的最后一项数据的
Observable


示例代码:

Observable.just(1,2,3,4,5,6,7,8).last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer < 6;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer o) {
Log.i("sss",o+"");
tv.setText(""+o);
}
});


输出:

5


Javadoc:last(Func1)

lastOrDefault(T)



lastOrDefault
last
类似,不同的是,如果原始
Observable
没有发射任何值,它发射你指定的默认值。

示例代码:

Observable.empty().lastOrDefault(99).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});


输出:

99


Javadoc:lastOrDefault(T)

lastOrDefault(T,Fun1)



示例代码:

Observable.just(1,2,3,4,5,6,7,8,9).lastOrDefault(99, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 10;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer o) {
Log.i("sss",o+"");
}
});


输出:

999


Javadoc:lastOrDefault(T,Func1)

16.
Sample
—> 定期发射Observable最近发射的数据项

Sample
操作符定时查看一个
Observable
,然后发射自上次采样以来它最近发射的数据。

在某些实现中,有一个
ThrottleFirst
操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。

RxJava
将这个操作符实现为
sample
throttleLast


注意:如果自上次采样以来,原始
Observable
没有发射任何数据,这个操作返回的
Observable
在那段时间内也不会发射任何数据。



sample
的这个变体每当第二个
Observable
发射一个数据(或者当它终止)时就对原始
Observable
进行采样。第二个
Observable
通过参数传递给
sample


sample
的这个变体默认不在任何特定的调度器上执行。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//前8个数字产生的时间间隔为1秒,后一个间隔为3秒
for (int i = 1; i < 9; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(2000);
subscriber.onNext(9);
subscriber.onCompleted();
} catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.sample(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 3
Next: 5
Next: 7
Next: 8
Next: 9
Sequence complete.


sample
(别名
throttleLast
)的一个变体按照你参数中指定的时间间隔定时采样(
TimeUnit
指定时间单位)。

sample
的这个变体默认在
computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。

Javadoc:sample(long,TimeUnit)throttleLast(long,TimeUnit)

Javadoc:sample(long,TimeUnit,Scheduler)throttleLast(long,TimeUnit,Scheduler)

17.
ThrottleFirst
—> throttleFirst与throttleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。



throttleFirst
操作符默认在
computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//前8个数字产生的时间间隔为1秒,后一个间隔为3秒
for (int i = 1; i < 9; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(2000);
subscriber.onNext(9);
subscriber.onCompleted();
} catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.throttleFirst(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1
Next: 4
Next: 7
Next: 9
Sequence complete.


Javadoc:throttleFirst(long,TimeUnit)

Javadoc:throttleFirst(long,TimeUnit,Scheduler)

18.
ThrottleWithTimeout( ) or Debounce( )
—> 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据

Debounce
操作符会过滤掉发射速率过快的数据项。

RxJava
将这个操作符实现为
throttleWithTimeout
debounce


注意:这个操作符会接着最后一项数据发射原始
Observable
onCompleted
通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,
onCompleted
通知不会触发限流。



throtleWithTimeout/debounce
的一个变体根据你指定的时间间隔进行限流,时间单位通过
TimeUnit
参数指定。

这种操作符默认在
computation
调度器上执行,但是你可以通过第三个参数指定。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});


输出:

Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
completed!


Javadoc:debounce(long,TimeUnit) and throttleWithTimeout(long,TimeUnit)

Javadoc:debounce(long,TimeUnit,Scheduler) and throttleWithTimeout(long,TimeUnit,Scheduler)



debounce
操作符的一个变体通过对原始
Observable
的每一项应用一个函数进行限流,这个函数返回一个
Observable
。如果原始
Observable
在这个新生成的
Observable
终止之前发射了另一个数据,
debounce
会抑制(
suppress
)这个数据项。

debounce
的这个变体默认不在任何特定的调度器上执行。

Javadoc:debounce(Func1)

19.
Timeout
—> 如果在一个指定的时间段后还没发射数据,就发射一个异常



timeout(long,TimeUnit)

如果原始
Observable
过了指定的一段时长没有发射任何数据,
Timeout
操作符会以一个
onError
通知终止这个
Observable


第一个变体接受一个时长参数,每当原始
Observable
发射了一项数据,
timeout
就启动一个计时器,如果计时器超过了指定指定的时长而原始
Observable
没有发射另一项数据,
timeout
就抛出
TimeoutException
,以一个错误通知终止
Observable


这个
timeout
默认在
computation
调度器上执行,你可以通过参数指定其它的调度器。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i*100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.timeout(300,TimeUnit.MILLISECONDS)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});


输出:

Next:0
Next:1
Next:2
Next:3
Next:4
Error:null


Javadoc:timeout(long,TimeUnit)

Javadoc:timeout(long,TimeUnit,Scheduler)

timeout(long,TimeUnit,Observable)

这个版本的
timeout
在超时时会切换到使用一个你指定的备用的
Observable
,而不是发错误通知。它也默认在
computation
调度器上执行。



示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i*100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.timeout(300,TimeUnit.MILLISECONDS,Observable.just(555))
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});


输出:

Next:0
Next:1
Next:2
Next:3
Next:555
completed!


Javadoc:timeout(long,TimeUnit,Observable)

Javadoc:timeout(long,TimeUnit,Observable,Scheduler)

timeout(Func1)



这个版本的
timeout
使用一个函数针对原始
Observable
的每一项返回一个
Observable
,如果当这个
Observable
终止时原始
Observable
还没有发射另一项数据,就会认为是超时了,
timeout
就抛出
TimeoutException
,以一个错误通知终止
Observable


这个
timeout
默认在
immediate
调度器上执行。

Javadoc:timeout(Func1)



这个版本的
timeout
同时指定超时时长和备用的
Observable
。它默认在
immediate
调度器上执行。

Javadoc:timeout(Func1,Observable)



这个版本的
time
除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在
immediate
调度器上执行。

Javadoc:timeout(Func0,Func1)



同上,但是同时可以指定一个备用的
Observable
。它默认在
immediate
调度器上执行。

Javadoc:timeout(Func0,Func1,Observable)

20.
IgnoreElements
—> 丢弃所有的正常数据,只发射错误或完成通知

不发射任何数据,只发射
Observable
的终止通知



IgnoreElements
操作符抑制原始
Observable
发射的所有数据,只允许它的终止通知(
onError
onCompleted
)通过。

如果你不关心一个
Observable
发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对
Observable
使用
ignoreElements
操作符,它会确保永远不会调用观察者的
onNext()
方法。

RxJava
将这个操作符实现为
ignoreElements


示例代码:

Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Sequence complete.


Javadoc:ignoreElements()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: