您的位置:首页 > 其它

RX操作符之结合操作(startWith、merge、mergeDelayError、zip、join、groupjoin、switchOnNext)

2016-08-08 15:14 561 查看
一、startWith
在数据序列的开头插入一条指定的项

Integer[]items = {6,7,8,2,3,4,5};

Observable<Integer> myObservable =  Observable.from(items)
.startWith(1);

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.subscribe(mySubscriber);


运行结果:



二、merge
合并多个Observables的发射物,使用
Merge
操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
Merge
可能会让合并的Observables发射的数据交错(有一个类似的操作符
Concat
不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

Integer[]itemsOne = {1,2,3,4,5};
Integer[]itemsTwo = {6,7,8,9,10};

Observable<Integer> one = Observable.from(itemsOne);
Observable<Integer> two =  Observable.from(itemsTwo);

Observable<Integer> myObservable = Observable.merge(one, two);

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.subscribe(mySubscriber);


运行结果:



三、mergeDelayError

<code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">MergeDelayError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">的操作符会保留</span><code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">onError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">通知直到合并后的Observable所有的数据发射完成,在那时它才会把</span><code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">onError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">传递给观察者。</span>
Integer[]itemsTwo = {1,2,3,4,5};
Observable<Integer> one =  Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if(i==3){
subscriber.onError(new Throwable("error"));
}else {
subscriber.onNext(i);
}
try {
Thread.sleep(100*i);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

Observable<Integer> two =  Observable.from(itemsTwo);

Observable<Integer> myObservable = Observable.mergeDelayError(one, two);

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.subscribe(mySubscriber);


运行结果:



四、zip
Zip
操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

Integer[]itemsOne = {1,2,3,4,5};
Integer[]itemsTwo = {6,7,8,9,10};

Observable<Integer> one = Observable.from(itemsOne);
Observable<Integer> two = Observable.from(itemsTwo);

Observable myObservable = Observable.zip(one, two, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer integer1, Integer integer2) {
return integer1 +  integer2;
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.subscribe(mySubscriber);
运行结果:



五、and/then/when

And/Then/When操作符组合的行为类似于
zip
,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到
Pattern
对象,然后操作那个
Pattern
对象,变换为一个
Plan
。随后将这些
Plan
变换为Observable的发射物。

六、combineLatest
当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。
CombineLatest
操作符行为类似于
zip
,但是只有当原始的Observable中的每一个都发射了一条数据时
zip
才发射数据。
CombineLatest
则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,
CombineLatest
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

Integer[]itemsOne = {1,2,3,4,5};
Integer[]itemsTwo = {6,7,8,9,10,11};

Observable<Integer> one = Observable.from(itemsOne);
Observable<Integer> two = Observable.from(itemsTwo);

Observable myObservable = Observable.combineLatest(one,two,new Func2<Integer,Integer,Integer>(){

@Override
public Integer call(Integer integer, Integer integer2) {
return integer*integer2;
}
});

Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

myObservable.subscribe(mySubscriber);

运行结果:



 七、join
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
Join
操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。

//源Observable
Observable left = Observable.just("join1-", "join2-", "join3-", "join4-");

//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,当时间超过2s后源Observale将不再发射,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(2,TimeUnit.SECONDS);
}

};

//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,当时间超过6s后源Observale将不再发射,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6,TimeUnit.SECONDS);
}

};

//组合源Observale与目标Observable数据的函数
Func2<String,Integer,String> resultSelector = new Func2<String,Integer,String>(){

@Override
public String call(String s, Integer integer2) {
return s+integer2;
}
};

Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

left.join(right,leftDurationSelector,rightDurationSelector,resultSelector).subscribe(mySubscriber);

运行结果:



八、groupjoin
groupJoin基本和join相同,只是最后组合函数的参数不同。

//源Observable
Observable left = Observable.just("groupJoin1-", "groupJoin2-", "groupJoin3-", "groupJoin4-");

//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {

}
}
subscriber.onCompleted();
}
});

//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,每条有效期为3s,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(3, TimeUnit.SECONDS);
}

};

//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,每条有效期为6s,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6, TimeUnit.SECONDS);
}

};

// 组合源Observale与目标Observable数据的函数
Func2<String,Observable<Integer>,Observable<String>> resultSelector = new Func2<String,Observable<Integer>,Observable<String>>(){

@Override
public Observable<String> call(final String s, Observable<Integer> integerObservable) {
return integerObservable.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return s+integer;
}
});
}
};

left
.groupJoin(right, leftDurationSelector, rightDurationSelector, resultSelector)
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(Observable<String> stringObservable) {
stringObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,"call................."+s);
}
});
}
});

运行结果:



九、switchOnNext
将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

每隔2s返回一个Observable,这个Observable每隔1s发射一条数据,在第二个Observable产生之前有4s时间,产生了四条数据。当新的Observable产生时,之前的Observable就被丢弃了

Observable<Observable<String>> observable = Observable.timer(2, 2, TimeUnit.SECONDS)
.map(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.timer(1,1,TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong+"";
}
}).take(4);
}
}).take(4);

Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}

@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};

Observable.switchOnNext(observable).subscribe(mySubscriber);

运行结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐