您的位置:首页 > 编程语言 > Java开发

RxJava2详解(二)--操作符

2017-06-24 22:37 176 查看

操作符简介

Observable和Observer只是ReactiveX的开始,他们自己只不过是标准观察者模式的轻微扩展,更适合处理事件序列而不是单个回调。

ReactiveX真正强大的是那些让你可以随意变换、组合、操作Observable发射的数据序列的操作符(Operators),这些操作符可以让你声明式地组合异步序列,同时具备回调的所有效率优势,但没有传统异步系统的嵌套回调处理的缺点。

操作符分类

ReactiveX为了能够更好进行异步处理操作,定义了非常多的操作符,每个平台实现可以根据需要实现,也可以自定义更多的操作符:

创建Observable(Creating Observables)
Create
,
Defer
,
Empty
/
Never
/
Throw
,
From
,
Interval
,
Just
,
Range
,
Repeat
,
Start
, and
Timer


变换Observable的Item(Transforming Observable Items)
Buffer
,
FlatMap
,
GroupBy
,
Map
,
Scan
, and
Window


过滤Observable(Filtering Observables)
Debounce
,
Distinct
,
ElementAt
,
Filter
,
First
,
IgnoreElements
,
Last
,
Sample
,
Skip
,
SkipLast
,
Take
, and
TakeLast


组合多个Observable(Combining Observables)
And
/
Then
/
When
,
CombineLatest
,
Join
,
Merge
,
StartWith
,
Switch
, and
Zip


错误处理(Error Handling Operators)
Catch
and
Retry


Observable工具(Utility Operators)
Delay
,
Do
,
Materialize
/
Dematerialize
,
ObserveOn
,
Serialize
,
Subscribe
,
SubscribeOn
,
TimeInterval
,
Timeout
,
Timestamp
, and
Using


条件及布尔判断(Conditional and Boolean Operators)
All
,
Amb
,
Contains
,
DefaultIfEmpty
,
SequenceEqual
,
SkipUntil
,
SkipWhile
,
TakeUntil
, and
TakeWhile


数学及集合操作符(Mathematical and Aggregate Operators)
Average
,
Concat
,
Count
,
Max
,
Min
,
Reduce
, and
Sum


转换Observable(Converting Observables)
To


Connectable Observable操作符(Connectable Observable Operators)
Connect
,
Publish
,
RefCount
, and
Replay


背压操作符(Backpressure Operators)
一些可以进行事件/数据流控制的操作符

操作符的链式调用

很多操作符都作用于Observable并返回一个Observable,这就意味着你可以一个接一个的链式使用这些操作符,链中的每个操作符都会修改之前操作符操作产生的Observable。

其它的链式调用模式,像Builder模式,也可以连续的调用一系列操作方法。Builder模式一般都是链式地修改同一个实例的属性,所以操作方法的调用顺序一般并没有什么影响,但是Observable操作符的使用顺序却很重要,因为每个操作符操作Observable后都会马上将新生成的Observable交给下一个操作符去处理。

一些“核心”的操作符

Create

创建Observable:



Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("one");
e.onNext("two");
e.onNext("three");
e.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
});


Defer

Defer操作符会等到有一个Observer订阅才会生成一个Observable,也就是说每个订阅者都有自己序列,这可以确保最后一刻(订阅时)创建的Observable包含最新的数据。



Observable<String> defer = Observable.defer(new Callable<Observab
@Override
public ObservableSource<? extends String> call() throws Excep
Object o = new Object();
System.out.println("emit: " + "object" + o.hashCode());
return Observable.just("object" + o.hashCode());
}
});
Consumer<String> consumer0 = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
Consumer<String> consumer1 = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
defer.subscribe(consumer0);
defer.subscribe(consumer1);


emit: object124101560
accept: object124101560
emit: object913896849
accept: object913896849


From

把其他对象或数据结构转成Observable



RxJava2的实现为
fromArray
fromCallable
fromFuture
fromIterable
fromPublisher
等方法。

Just

把一个item转换成发射这个item的Observable。Just和From类似,From会把数组或iterable或其它有序东西内部的所有item取出来发射,而Just只会简单地将数组或iterable或者其它原来的东西不做任何更改地作为一个item发射。



Interval

创建一个每隔给定的时间间隔发射一个递增整数的Observable



Timer

创建一个给定延迟后发射一个item(0L)的Observable



Range

创建发射给定范围的连续整数的Observable



Map

通过给每个item应用函数来转换要发射的item,Map操作符将返回发射函数应用结果item的新的Observable



FlatMap

FlatMap操作符会应用你指定的函数到每个源Observable要发射的item,该函数会返回一个自己发射item的Observable,然后FlatMap会merge这些新的Observable,把merge后的item序列作为新Observable的发射序列。由于是merge操作所以item发射顺序可能是交错的,如果想保证严格的发射顺序可以使用ConcatMap操作符。



Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer + "-" + i);
}
return Observable.fromIterable(list).delay(50, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});


accept: I am value 1-1
accept: I am value 2-1
accept: I am value 2-2
accept: I am value 1-2
accept: I am value 3-1
accept: I am value 3-2


Filter

只发射Observable中那些通过判定测试(predicate test)的item



Take

只发射Observable前n个item



Merge

把多个Observable merge为一个Observable,merge发射的item可能是交错的,且如果任何源Observable出现
onError
都会马上终止merge过程并传给最终Observable。如果想延迟
onError
到merge结束可以使用MergeDelayError操作符。



Zip

根据你给定的方法把多个Observable发射的item结合在一起,每组item结合后都作为单个item(给定方法的返回值)发射。Zip操作符严格按序应用给定的方法,所以新生成的Observable的第一个item肯定是Observable #1第一个item和Observable #2第一个item的结合(即方法返回值),新生成的Observable的第二个item肯定是Observable #1第二个item和Observable #2第二个item的结合,以此类推,只发射与 发射最少item的源Observable的发射item数 一样多的item



Delay

返回一个每发射一个源Observable的item之前都延迟给定时间的Observable,但
onError
不会被延迟。



SubscribeOn

指定Observable要操作在哪个Scheduler上。



ObserveOn

指定Observer将要在哪个Scheduler上订阅它的Observable。



Subscribe

把Observable和Observer连接起来,只有通过Subscribe操作符订阅Observable才能收到Observable发射的item以及
onError
onComplete
信号。

All

判断Observable发射的所有的item是否都满足指定条件。当且仅当源Observable正常终止且每个发射的item都被给定的判定函数判断为
true
时,All操作符才会返回一个只发射一个
true
的Observable。如果源Observable发射的任何一个item被给定的判定函数判断为false,All操作符会返回一个只发射一个
false
的Observable。



Amb

Ambiguous(模棱两可的)的缩写。对于给定的两个或多个源Observable,只发射 第一个发射item或通知(
onError
onCompleted
)的那个Observable 的所有item及通知,Amb会忽略并丢弃其它源Observable发射的item及通知。



Contains

判断Observable是否发射了指定的item,如果源Observable发射了指定的item就返回一个发射
true
的Observable,如果源Observable直到结束都没发射指定的item就返回一个发射
false
的Observable。类似的IsEmpty操作符会在源Observable直到终止都没发射任何item时返回一个发射
true
的Observable。



SkipUntil

在第二个Observable发射一个item之前丢弃源Observable要发射的item,之后会镜像发射源Observable的item。



SkipWhile

在你给定的条件变成false之前丢弃源Observable要发射的item,之后会镜像发射源Observable的item。



TakeUntil

TakeUtil会镜像源Observable并监视你给定的第二个Observable,在第二个Observable发射一个item或终止信号(
onError
onCompleted
)后丢弃源Observable的任何item(即停止镜像源Observable并终止)。



Concat

简单地将多个Observable连接(无交错的)成一个Observable,即只有第一个Observable的item都被发射完才会发射第二个Observable的item,以此类推。由于Concat会等待订阅 给定的多个Observable 直到之前的Observable完成,如果你想连接一个"热Observable"(在被订阅之前就立即发射item的Observable),Concat将看不到这些也就不会发射任何item。



References

ReactiveX.io
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息