RxJava 学习笔记(七) --- Filtering 过滤操作
2016-07-12 15:51
399 查看
Filter 只发射通过了谓词测试的数据项
OfType ofType是filter操作符的一个特殊形式它过滤一个Observable只返回指定类型的数据
Take 只发射开始的N项数据
TakeLast 只发射最后N个元素
TakeLastBuffer 将最后的N项数据当做单个数据发射
Skip 跳过开始的N项数据
SkipLast 跳过后面的N项数据
Distinct 过滤掉重复数据
DistinctUntilChanged 过滤掉连续重复的数据
ElementAt 发射第N项数据
ElementAtOrDefault 发射第N项数据如果索引值大于数据项数它会发射一个默认值通过额外的参数指定
First 只发射第一项数据
TakeFirst 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目
Single single操作符也与first类似
Last 只发射最后一项或者满足某个条件的最后一项数据
Sample 定期发射Observable最近发射的数据项
ThrottleFirst throttleFirst与throttleLastsample不同在每个采样周期内它总是发射原始Observable的第一项数据而不是最近的一项
ThrottleWithTimeout or Debounce 只有当Observable在指定的时间后还没有发射数据时才发射一个数据
Timeout 如果在一个指定的时间段后还没发射数据就发射一个异常
IgnoreElements 丢弃所有的正常数据只发射错误或完成通知
1.
示例代码
输出:
Javadoc:filter(Func1)
2.
示例代码:
输出:
Javadoc:ofType(Class)
3.
使用
如果你对一个
示例代码:
输出:
Javadoc:take(int)
Javadoc:take(long,TimeUnit)
Javadoc:take(long,TimeUnit,Scheduler)
4.
示例代码:
输出:
Javadoc:takeLast(int)
5.
它和
示例代码:
输出:
Javadoc:takeLastBuffer(int)
Javadoc:takeLastBuffer(long,TimeUnit)
Javadoc:takeLastBuffer(long,TimeUnit,Scheduler)
Javadoc:takeLastBuffer(int,long,TimeUnit)
Javadoc:takeLastBuffer(int,long,TimeUnit,Scheduler)
6.
抑制
使用
示例代码
输出
Javadoc: skip(int)
Javadoc: skip(long,TimeUnit)
Javadoc: skip(long,TimeUnit,Scheduler)
7.
抑制
使用
使用
示例代码:
输出
Javadoc:skipLast(int)
还有一个
注意:这个机制是这样实现的:延迟原始
Javadoc:skipLast(long,TimeUnit)
Javadoc:skipLast(long,TimeUnit,Scheduler)
8.
在某些实现中,有一些变体允许你调整判定两个数据不同(
distinct()
示例代码:
输出
Javadoc:distinct()
distinct(Func1)
这个操作符有一个变体接受一个函数。这个函数根据原始
示例代码:
输出: 1% 3= 1 , 2%3 =2 ,3%3 = 0 , 4%3 = 1 , 5%3=2 ,6%3 = 0 ,后面三个和前面三个的值重复去掉
Javadoc:distinct(Func1)
9.
DistinctUntilChanged()
示例代码:
输出:
DistinctUntilChanged(Func1)
和
示例代码
输出
Javadoc:distinctUntilChanged(Func1)
*
10.
如果你传递的是一个
示例代码:
输出:
Javadoc: elementAt(int)
11.
示例代码:
输出:
Javadoc:Javadoc: elementAtOrDefault(int,T)
12.
如果你只对
在某些实现中,
在一些实现中还有一个
在
可能容易混淆,
还有几个其它的操作符执行类似的功能。
First()
示例代码:
输出:
Java:first()
First(Func1)
传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。
示例代码:
输出
Javadoc:first(Func1)
FirstOrDefault(T)
示例代码:
输出
Javadoc:firstOrDefault(T)
FirstOrDefault(T, Func1)
传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。
示例代码:
输出:
Javadoc:firstOrDefault(T, Func1)
13.
示例代码:
输出
Javadoc:takeFirst(Func1)
14.
Single()
示例代码:
输出
Javadoc:single()
Single(Func1)
示例代码:
输出:
Javadoc:single(Func1)
singleOrDefault(T)
和
示例代码:
输出:
Javadoc:singleOrDefault(T)
singleOrDefault(T,Func1))
和
示例代码
Javadoc:singleOrDefault(Func1,T)
15.
如果你只对
在某些实现中,
在
可能容易混淆,
last()
只发射最后一项数据,使用没有参数的last操作符。
示例代码:
输出:
Javadoc:just()
last(Func1)
这个版本的
示例代码:
输出:
Javadoc:last(Func1)
lastOrDefault(T)
示例代码:
输出:
Javadoc:lastOrDefault(T)
lastOrDefault(T,Fun1)
示例代码:
输出:
Javadoc:lastOrDefault(T,Func1)
16.
在某些实现中,有一个
注意:如果自上次采样以来,原始
示例代码:
输出:
Javadoc:sample(long,TimeUnit)或throttleLast(long,TimeUnit)
Javadoc:sample(long,TimeUnit,Scheduler) 或throttleLast(long,TimeUnit,Scheduler)
17.
示例代码:
输出:
Javadoc:throttleFirst(long,TimeUnit)
Javadoc:throttleFirst(long,TimeUnit,Scheduler)
18.
注意:这个操作符会接着最后一项数据发射原始
这种操作符默认在
示例代码:
输出:
Javadoc:debounce(long,TimeUnit) and throttleWithTimeout(long,TimeUnit)
Javadoc:debounce(long,TimeUnit,Scheduler) and throttleWithTimeout(long,TimeUnit,Scheduler)
Javadoc:debounce(Func1)
19.
timeout(long,TimeUnit)
如果原始
第一个变体接受一个时长参数,每当原始
这个
示例代码:
输出:
Javadoc:timeout(long,TimeUnit)
Javadoc:timeout(long,TimeUnit,Scheduler)
timeout(long,TimeUnit,Observable)
这个版本的
示例代码:
输出:
Javadoc:timeout(long,TimeUnit,Observable)
Javadoc:timeout(long,TimeUnit,Observable,Scheduler)
timeout(Func1)
这个版本的
这个
Javadoc:timeout(Func1)
这个版本的
Javadoc:timeout(Func1,Observable)
这个版本的
Javadoc:timeout(Func0,Func1)
同上,但是同时可以指定一个备用的
Javadoc:timeout(Func0,Func1,Observable)
20.
不发射任何数据,只发射
如果你不关心一个
示例代码:
输出:
Javadoc:ignoreElements()
OfType ofType是filter操作符的一个特殊形式它过滤一个Observable只返回指定类型的数据
Take 只发射开始的N项数据
TakeLast 只发射最后N个元素
TakeLastBuffer 将最后的N项数据当做单个数据发射
Skip 跳过开始的N项数据
SkipLast 跳过后面的N项数据
Distinct 过滤掉重复数据
DistinctUntilChanged 过滤掉连续重复的数据
ElementAt 发射第N项数据
ElementAtOrDefault 发射第N项数据如果索引值大于数据项数它会发射一个默认值通过额外的参数指定
First 只发射第一项数据
TakeFirst 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目
Single single操作符也与first类似
Last 只发射最后一项或者满足某个条件的最后一项数据
Sample 定期发射Observable最近发射的数据项
ThrottleFirst throttleFirst与throttleLastsample不同在每个采样周期内它总是发射原始Observable的第一项数据而不是最近的一项
ThrottleWithTimeout or Debounce 只有当Observable在指定的时间后还没有发射数据时才发射一个数据
Timeout 如果在一个指定的时间段后还没发射数据就发射一个异常
IgnoreElements 丢弃所有的正常数据只发射错误或完成通知
1. Filter
—> 只发射通过了谓词测试的数据项
Filter操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。
示例代码
Observable.just(1, 2, 3, 4, 5) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer item) { return( item < 4 ); } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 1 Next: 2 Next: 3 Sequence complete.
filter默认不在任何特定的调度器上执行。
Javadoc:filter(Func1)
2. OfType
—> ofType是filter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
ofType默认不在任何特定的调度器上指定。
示例代码:
Observable.just(1,"sb",0.1f).ofType(String.class).subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
输出:
sb
Javadoc:ofType(Class)
3. Take
—> 只发射开始的N项数据
使用
Take操作符让你可以修改
Observable的行为,只返回前面的
N项数据,然后发射完成通知,忽略剩余的数据。
如果你对一个
Observable使用
take(n)(或它的同义词
limit(n))操作符,而那个
Observable发射的数据少于N项,那么take操作生成的
Observable不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。
示例代码:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
take(int)默认不任何特定的调度器上执行。
Javadoc:take(int)
take的这个变体接受一个时长而不是数量参数。它会丢发射
Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。
take的这个变体默认在
computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。
Javadoc:take(long,TimeUnit)
Javadoc:take(long,TimeUnit,Scheduler)
4. TakeLast
—> 只发射最后N个元素
takeLast操作符是把源
Observable产生的结果的后
n项提交给订阅者,提交时机是
Observable发布
onCompleted通知之时。
示例代码:
Observable.just(1,2,3,4,5,6,7).takeLast(2) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 6 Next: 7 Sequence complete.
Javadoc:takeLast(int)
5. TakeLastBuffer
—> 将最后的N项数据当做单个数据发射
它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个
List再发射,而不是依次发射一个。
示例代码:
Observable.just(1,2,3,4).takeLastBuffer(2).subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { String s = ""; for(Integer str : integers){ s = s +str +","; } System.out.println(s); } });
输出:
I/System.out: 3,4,
Javadoc:takeLastBuffer(int)
Javadoc:takeLastBuffer(long,TimeUnit)
Javadoc:takeLastBuffer(long,TimeUnit,Scheduler)
Javadoc:takeLastBuffer(int,long,TimeUnit)
Javadoc:takeLastBuffer(int,long,TimeUnit,Scheduler)
6. Skip
—> 跳过开始的N项数据
抑制Observable发射的前
N项数据
使用
Skip操作符,你可以忽略
Observable发射的前
N项数据,只保留之后的数据。
skip的这个变体默认不在任何特定的调度器上执行。
示例代码
Observable.just(1,2,3,4).skip(1).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出
2 3 4
Javadoc: skip(int)
skip的这个变体接受一个时长而不是数量参数。它会丢弃原始
Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。
skip的这个变体默认在
computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。
Javadoc: skip(long,TimeUnit)
Javadoc: skip(long,TimeUnit,Scheduler)
7. SkipLast
—> 跳过后面的N项数据
抑制Observable发射的后
N项数据
使用
SkipLast操作符修改原始
Observable,你可以忽略
Observable发射的后
N项数据,只保留前面的数据。
使用
SkipLast操作符,你可以忽略原始
Observable发射的后
N项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始
Observable发射的任何数据项,直到它发射了
N项数据。
示例代码:
Observable.just(1,2,3,4).skipLast(1).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出
1 2 3
skipLast的这个变体默认不在任何特定的调度器上执行。
Javadoc:skipLast(int)
还有一个
skipLast变体接受一个时长而不是数量参数。它会丢弃在原始
Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。
注意:这个机制是这样实现的:延迟原始
Observable发射的任何数据项,直到自这次发射之后过了给定的时长。
skipLast的这个变体默认在
computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。
Javadoc:skipLast(long,TimeUnit)
Javadoc:skipLast(long,TimeUnit,Scheduler)
8. Distinct
—> 过滤掉重复数据
Distinct的过滤规则是:只允许还没有发射过的数据项通过。
在某些实现中,有一些变体允许你调整判定两个数据不同(
distinct)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。
distinct()
示例代码:
Observable.just(1, 2, 1, 1, 2, 3) .distinct() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出
Next: 1 Next: 2 Next: 3 Sequence complete.
Javadoc:distinct()
distinct(Func1)
这个操作符有一个变体接受一个函数。这个函数根据原始
Observable发射的数据项产生一个
Key,然后,比较这些
Key而不是数据本身,来判定两个数据是否是不同的。
示例代码:
Observable.just(1,2,3,4,5,6).distinct(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer%3; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出: 1% 3= 1 , 2%3 =2 ,3%3 = 0 , 4%3 = 1 , 5%3=2 ,6%3 = 0 ,后面三个和前面三个的值重复去掉
I/System.out: 1 I/System.out: 2 I/System.out: 3
Javadoc:distinct(Func1)
9. DistinctUntilChanged
—> 过滤掉连续重复的数据
DistinctUntilChanged()示例代码:
Observable.just(1,2,2,2,5,6).distinctUntilChanged().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出:
1 2 5 6
DistinctUntilChanged(Func1)
和
distinct(Func1)一样,根据一个函数产生的
Key判定两个相邻的数据项是不是不同的。
示例代码
Observable.just(1,2,2,2,5,11).distinctUntilChanged(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer %2; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出
1 2 5
distinct和
distinctUntilChanged默认不在任何特定的调度器上执行。
Javadoc:distinctUntilChanged(Func1)
*
10. ElementAt
—> 发射第N项数据
ElementAt操作符获取原始
Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
RxJava将这个操作符实现为
elementAt,给它传递一个基于
0的索引值,它会发射原始
Observable数据序列对应索引位置的值,如果你传递给
elementAt的值为
5,那么它会发射第
六项的数据。
如果你传递的是一个
负数,或者原始
Observable的数据项数小于
index+1,将会抛出一个
IndexOutOfBoundsException异常。
示例代码:
Observable.just(1,2,3,4,5,6).elementAt(2) .subscribe( new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("Next:" + integer); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { System.out.println("Error:" + throwable.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("completed!"); } });
输出:
Next:3 completed!
Javadoc: elementAt(int)
11. ElementAtOrDefault
—> 发射第N项数据,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定)
RxJava还实现了
elementAtOrDefault操作符。与
elementAt的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个
IndexOutOfBoundsException异常。
示例代码:
Observable.just(1,2,3,4,5,6).elementAtOrDefault(13,999).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出:
999
elementAt和
elementAtOrDefault默认不在任何特定的调度器上执行。
Javadoc:Javadoc: elementAtOrDefault(int,T)
12. First
—> 只发射第一项数据
如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用
First操作符。
在某些实现中,
First没有实现为一个返回
Observable的过滤操作符,而是实现为一个在当时就发射原始
Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用
Take(1)或者
ElementAt(0)。
在一些实现中还有一个
Single操作符。它的行为与
First类似,但为了确保只发射单个值,它会等待原始
Observable终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始
Observable获取第一项数据,而且也确保只发射一项数据。
在
RxJava中,这个操作符被实现为first,
firstOrDefault和
takeFirst。
可能容易混淆,
BlockingObservable也有名叫
first和
firstOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个
Observable。
还有几个其它的操作符执行类似的功能。
First()
示例代码:
Observable.just(1, 2, 3) .first() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 1 Sequence complete.
Java:first()
First(Func1)
传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。
示例代码:
Observable.just(1,2,3,4,5,6).first(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer>3; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出
4
Javadoc:first(Func1)
FirstOrDefault(T)
firstOrDefault与
first类似,但是在
Observable没有发射任何数据时发射一个你在参数中指定的默认值。
示例代码:
Observable.empty().firstOrDefault("fuck you").subscribe(new Action1<Object>() { @Override public void call(Object o) { System.out.println(o+""); } });
输出
fuck you
Javadoc:firstOrDefault(T)
FirstOrDefault(T, Func1)
传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。
示例代码:
Observable.just(1,2,3,4,5,6).firstOrDefault(99, new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer == 4; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出:
4
Javadoc:firstOrDefault(T, Func1)
13. TakeFirst
—> 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目。
takeFirst操作符类似于
take操作符,同时也类似于
first操作符,都是获取源
Observable产生的结果列表中符合指定条件的前一个或多个,与
first操作符不同的是,
first操作符如果获取不到数据,则会抛出
NoSuchElementException异常,而
takeFirst则会返回一个空的
Observable,该
Observable只有
onCompleted通知而没有
onNext通知。
示例代码:
Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { //获取数值大于3的数据 return integer>3; } }) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出
Next: 4 Sequence complete.
Javadoc:takeFirst(Func1)
14. Single
—> single操作符也与first类似
Single()single操作符也与first类似,但是如果原始
Observable在完成之前不是正好发射一次数据,它会抛出一个
NoSuchElementException。
示例代码:
Observable.just(1,2).single().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("===============>"+integer+""); } });
输出
rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
Javadoc:single()
Single(Func1)
single的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。
示例代码:
Observable.just(1,2,3,4,5,6).single(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer>5; // 输出值为6 return integer>3; // 报错 Sequence contains too many elements return integer>6; // 报错 Sequence contains no elements } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } });
输出:
return integer>5; // 输出值为6 return integer>3; // 报错 Sequence contains too many elements return integer>6; // 报错 Sequence contains no elements
Javadoc:single(Func1)
singleOrDefault(T)
和
firstOrDefault类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。
示例代码:
Observable.just("1","233").singleOrDefault("fuck you two").subscribe(new Action1<Object>() { @Override public void call(Object o) { Log.i("sss",o+""); } });
输出:
rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
Javadoc:singleOrDefault(T)
singleOrDefault(T,Func1))
和
firstOrDefault(T, Func1)类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。
示例代码
Observable.just(1,2,3,4,5,6,7,8).singleOrDefault(666, new Func1<Integer, Boolean>() { @Override public Boolean call(Integer s) { return s>4; //rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements return s>12; // 666 return s>7; // 8 } }).subscribe(new Action1<Object>() { @Override public void call(Object o) { Log.i("sss",o+""); } });
Javadoc:singleOrDefault(Func1,T)
15. Last
—> 只发射最后一项(或者满足某个条件的最后一项)数据
如果你只对
Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用
Last操作符。
在某些实现中,
Last没有实现为一个返回
Observable的过滤操作符,而是实现为一个在当时就发射原始
Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)。
在
RxJava中的实现是
last和
lastOrDefault。
可能容易混淆,
BlockingObservable也有名叫
last和
lastOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个
Observable。
last()
只发射最后一项数据,使用没有参数的last操作符。
示例代码:
Observable.just(1, 2, 3) .last() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 3 Sequence complete.
Javadoc:just()
last(Func1)
这个版本的
last也是接受一个谓词函数,返回一个发射原始
Observable中满足条件的最后一项数据的
Observable。
示例代码:
Observable.just(1,2,3,4,5,6,7,8).last(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 6; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer o) { Log.i("sss",o+""); tv.setText(""+o); } });
输出:
5
Javadoc:last(Func1)
lastOrDefault(T)
lastOrDefault与
last类似,不同的是,如果原始
Observable没有发射任何值,它发射你指定的默认值。
示例代码:
Observable.empty().lastOrDefault(99).subscribe(new Action1<Object>() { @Override public void call(Object o) { Log.i("sss",o+""); } });
输出:
99
Javadoc:lastOrDefault(T)
lastOrDefault(T,Fun1)
示例代码:
Observable.just(1,2,3,4,5,6,7,8,9).lastOrDefault(99, new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 10; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer o) { Log.i("sss",o+""); } });
输出:
999
Javadoc:lastOrDefault(T,Func1)
16. Sample
—> 定期发射Observable最近发射的数据项
Sample操作符定时查看一个
Observable,然后发射自上次采样以来它最近发射的数据。
在某些实现中,有一个
ThrottleFirst操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。
RxJava将这个操作符实现为
sample和
throttleLast。
注意:如果自上次采样以来,原始
Observable没有发射任何数据,这个操作返回的
Observable在那段时间内也不会发射任何数据。
sample的这个变体每当第二个
Observable发射一个数据(或者当它终止)时就对原始
Observable进行采样。第二个
Observable通过参数传递给
sample。
sample的这个变体默认不在任何特定的调度器上执行。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if(subscriber.isUnsubscribed()) return; try { //前8个数字产生的时间间隔为1秒,后一个间隔为3秒 for (int i = 1; i < 9; i++) { subscriber.onNext(i); Thread.sleep(1000); } Thread.sleep(2000); subscriber.onNext(9); subscriber.onCompleted(); } catch(Exception e){ subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .sample(2200, TimeUnit.MILLISECONDS) //采样间隔时间为2200毫秒 .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 3 Next: 5 Next: 7 Next: 8 Next: 9 Sequence complete.
sample(别名
throttleLast)的一个变体按照你参数中指定的时间间隔定时采样(
TimeUnit指定时间单位)。
sample的这个变体默认在
computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。
Javadoc:sample(long,TimeUnit)或throttleLast(long,TimeUnit)
Javadoc:sample(long,TimeUnit,Scheduler) 或throttleLast(long,TimeUnit,Scheduler)
17. ThrottleFirst
—> throttleFirst与throttleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。
throttleFirst操作符默认在
computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if(subscriber.isUnsubscribed()) return; try { //前8个数字产生的时间间隔为1秒,后一个间隔为3秒 for (int i = 1; i < 9; i++) { subscriber.onNext(i); Thread.sleep(1000); } Thread.sleep(2000); subscriber.onNext(9); subscriber.onCompleted(); } catch(Exception e){ subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .throttleFirst(2200, TimeUnit.MILLISECONDS) //采样间隔时间为2200毫秒 .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Next: 1 Next: 4 Next: 7 Next: 9 Sequence complete.
Javadoc:throttleFirst(long,TimeUnit)
Javadoc:throttleFirst(long,TimeUnit,Scheduler)
18. ThrottleWithTimeout( ) or Debounce( )
—> 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
Debounce操作符会过滤掉发射速率过快的数据项。
RxJava将这个操作符实现为
throttleWithTimeout和
debounce。
注意:这个操作符会接着最后一项数据发射原始
Observable的
onCompleted通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,
onCompleted通知不会触发限流。
throtleWithTimeout/debounce的一个变体根据你指定的时间间隔进行限流,时间单位通过
TimeUnit参数指定。
这种操作符默认在
computation调度器上执行,但是你可以通过第三个参数指定。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if(subscriber.isUnsubscribed()) return; try { for (int i = 0; i < 10; i++) { subscriber.onNext(i); Thread.sleep(i * 100); } subscriber.onCompleted(); }catch(Exception e){ subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .debounce(400, TimeUnit.MILLISECONDS) .subscribe( new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("Next:" + integer); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { System.out.println("Error:" + throwable.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("completed!"); } });
输出:
Next:4 Next:5 Next:6 Next:7 Next:8 Next:9 completed!
Javadoc:debounce(long,TimeUnit) and throttleWithTimeout(long,TimeUnit)
Javadoc:debounce(long,TimeUnit,Scheduler) and throttleWithTimeout(long,TimeUnit,Scheduler)
debounce操作符的一个变体通过对原始
Observable的每一项应用一个函数进行限流,这个函数返回一个
Observable。如果原始
Observable在这个新生成的
Observable终止之前发射了另一个数据,
debounce会抑制(
suppress)这个数据项。
debounce的这个变体默认不在任何特定的调度器上执行。
Javadoc:debounce(Func1)
19. Timeout
—> 如果在一个指定的时间段后还没发射数据,就发射一个异常
timeout(long,TimeUnit)
如果原始
Observable过了指定的一段时长没有发射任何数据,
Timeout操作符会以一个
onError通知终止这个
Observable。
第一个变体接受一个时长参数,每当原始
Observable发射了一项数据,
timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始
Observable没有发射另一项数据,
timeout就抛出
TimeoutException,以一个错误通知终止
Observable。
这个
timeout默认在
computation调度器上执行,你可以通过参数指定其它的调度器。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if(subscriber.isUnsubscribed()) return; try { for (int i = 0; i < 10; i++) { subscriber.onNext(i); Thread.sleep(i*100); } subscriber.onCompleted(); }catch(Exception e){ subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .timeout(300,TimeUnit.MILLISECONDS) .subscribe( new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("Next:" + integer); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { System.out.println("Error:" + throwable.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("completed!"); } });
输出:
Next:0 Next:1 Next:2 Next:3 Next:4 Error:null
Javadoc:timeout(long,TimeUnit)
Javadoc:timeout(long,TimeUnit,Scheduler)
timeout(long,TimeUnit,Observable)
这个版本的
timeout在超时时会切换到使用一个你指定的备用的
Observable,而不是发错误通知。它也默认在
computation调度器上执行。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if(subscriber.isUnsubscribed()) return; try { for (int i = 0; i < 10; i++) { subscriber.onNext(i); Thread.sleep(i*100); } subscriber.onCompleted(); }catch(Exception e){ subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .timeout(300,TimeUnit.MILLISECONDS,Observable.just(555)) .subscribe( new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("Next:" + integer); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { System.out.println("Error:" + throwable.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("completed!"); } });
输出:
Next:0 Next:1 Next:2 Next:3 Next:555 completed!
Javadoc:timeout(long,TimeUnit,Observable)
Javadoc:timeout(long,TimeUnit,Observable,Scheduler)
timeout(Func1)
这个版本的
timeout使用一个函数针对原始
Observable的每一项返回一个
Observable,如果当这个
Observable终止时原始
Observable还没有发射另一项数据,就会认为是超时了,
timeout就抛出
TimeoutException,以一个错误通知终止
Observable。
这个
timeout默认在
immediate调度器上执行。
Javadoc:timeout(Func1)
这个版本的
timeout同时指定超时时长和备用的
Observable。它默认在
immediate调度器上执行。
Javadoc:timeout(Func1,Observable)
这个版本的
time除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在
immediate调度器上执行。
Javadoc:timeout(Func0,Func1)
同上,但是同时可以指定一个备用的
Observable。它默认在
immediate调度器上执行。
Javadoc:timeout(Func0,Func1,Observable)
20. IgnoreElements
—> 丢弃所有的正常数据,只发射错误或完成通知
不发射任何数据,只发射Observable的终止通知
IgnoreElements操作符抑制原始
Observable发射的所有数据,只允许它的终止通知(
onError或
onCompleted)通过。
如果你不关心一个
Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对
Observable使用
ignoreElements操作符,它会确保永远不会调用观察者的
onNext()方法。
RxJava将这个操作符实现为
ignoreElements。
示例代码:
Observable.just(1,2,3,4,5,6,7,8).ignoreElements() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出:
Sequence complete.
Javadoc:ignoreElements()
相关文章推荐
- 20个常用的Java 功能代码
- Java处理Json数据
- Java 之 Date 获取 年月日时分秒
- java中null的深刻理解
- spring2.5+hibernate基于xml配置的实例
- Commons_FileUpload
- 9web xml元素
- springmvc+hibernate4+spring3整合
- 8防止重复提交
- java基础八:字符串
- 6拦截器
- 5文件上传与下载
- SpringMVC日期转换之JsonSerialize
- 2使用struts2的标签库
- 最简单的Java工厂设计模式样例
- 2使用struts2的标签库 EL表达式
- 1使用Struts2
- window JNI_CreateJavaVM启动java程序
- Java学习之Iterator(迭代器)的一般用法 (转)
- 14web启动将数据加载到内存中