RX操作符之结合操作(startWith、merge、mergeDelayError、zip、join、groupjoin、switchOnNext)
2016-08-08 15:14
561 查看
一、startWith
在数据序列的开头插入一条指定的项
运行结果:
二、merge
合并多个Observables的发射物,使用
运行结果:
三、mergeDelayError
运行结果:
四、zip
五、and/then/when
And/Then/When操作符组合的行为类似于
六、combineLatest
当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。
Integer[]itemsOne = {1,2,3,4,5};
Integer[]itemsTwo = {6,7,8,9,10,11};
Observable<Integer> one = Observable.from(itemsOne);
Observable<Integer> two = Observable.from(itemsTwo);
Observable myObservable = Observable.combineLatest(one,two,new Func2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer integer, Integer integer2) {
return integer*integer2;
}
});
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
myObservable.subscribe(mySubscriber);
运行结果:
七、join
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
//源Observable
Observable left = Observable.just("join1-", "join2-", "join3-", "join4-");
//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,当时间超过2s后源Observale将不再发射,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(2,TimeUnit.SECONDS);
}
};
//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,当时间超过6s后源Observale将不再发射,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6,TimeUnit.SECONDS);
}
};
//组合源Observale与目标Observable数据的函数
Func2<String,Integer,String> resultSelector = new Func2<String,Integer,String>(){
@Override
public String call(String s, Integer integer2) {
return s+integer2;
}
};
Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
left.join(right,leftDurationSelector,rightDurationSelector,resultSelector).subscribe(mySubscriber);
运行结果:
八、groupjoin
groupJoin基本和join相同,只是最后组合函数的参数不同。
//源Observable
Observable left = Observable.just("groupJoin1-", "groupJoin2-", "groupJoin3-", "groupJoin4-");
//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,每条有效期为3s,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(3, TimeUnit.SECONDS);
}
};
//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,每条有效期为6s,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6, TimeUnit.SECONDS);
}
};
// 组合源Observale与目标Observable数据的函数
Func2<String,Observable<Integer>,Observable<String>> resultSelector = new Func2<String,Observable<Integer>,Observable<String>>(){
@Override
public Observable<String> call(final String s, Observable<Integer> integerObservable) {
return integerObservable.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return s+integer;
}
});
}
};
left
.groupJoin(right, leftDurationSelector, rightDurationSelector, resultSelector)
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(Observable<String> stringObservable) {
stringObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,"call................."+s);
}
});
}
});
运行结果:
九、switchOnNext
将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据
每隔2s返回一个Observable,这个Observable每隔1s发射一条数据,在第二个Observable产生之前有4s时间,产生了四条数据。当新的Observable产生时,之前的Observable就被丢弃了
Observable<Observable<String>> observable = Observable.timer(2, 2, TimeUnit.SECONDS)
.map(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.timer(1,1,TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong+"";
}
}).take(4);
}
}).take(4);
Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
Observable.switchOnNext(observable).subscribe(mySubscriber);
运行结果:
在数据序列的开头插入一条指定的项
Integer[]items = {6,7,8,2,3,4,5}; Observable<Integer> myObservable = Observable.from(items) .startWith(1); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.subscribe(mySubscriber);
运行结果:
二、merge
合并多个Observables的发射物,使用
Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符
Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
Integer[]itemsOne = {1,2,3,4,5}; Integer[]itemsTwo = {6,7,8,9,10}; Observable<Integer> one = Observable.from(itemsOne); Observable<Integer> two = Observable.from(itemsTwo); Observable<Integer> myObservable = Observable.merge(one, two); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.subscribe(mySubscriber);
运行结果:
三、mergeDelayError
<code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">MergeDelayError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">的操作符会保留</span><code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">onError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">通知直到合并后的Observable所有的数据发射完成,在那时它才会把</span><code style="box-sizing: border-box; -webkit-tap-highlight-color: transparent; -webkit-font-smoothing: antialiased; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; font-size: 0.85em; break-inside: avoid; direction: ltr; margin: 0px; padding: 0.2em; border: none; color: rgb(51, 51, 51); letter-spacing: 0.2px; orphans: 3; widows: 3; background-color: rgb(247, 247, 247);">onError</code><span style="color: rgb(51, 51, 51); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 16px; letter-spacing: 0.2px; line-height: 27.2px; orphans: 3; widows: 3;">传递给观察者。</span>
Integer[]itemsTwo = {1,2,3,4,5}; Observable<Integer> one = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if(i==3){ subscriber.onError(new Throwable("error")); }else { subscriber.onNext(i); } try { Thread.sleep(100*i); } catch (Exception e) { } } subscriber.onCompleted(); } }); Observable<Integer> two = Observable.from(itemsTwo); Observable<Integer> myObservable = Observable.mergeDelayError(one, two); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.subscribe(mySubscriber);
运行结果:
四、zip
Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Integer[]itemsOne = {1,2,3,4,5}; Integer[]itemsTwo = {6,7,8,9,10}; Observable<Integer> one = Observable.from(itemsOne); Observable<Integer> two = Observable.from(itemsTwo); Observable myObservable = Observable.zip(one, two, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer1, Integer integer2) { return integer1 + integer2; } }); Subscriber<Integer> mySubscriber =new Subscriber<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG,"onNext................."+integer); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; myObservable.subscribe(mySubscriber);运行结果:
五、and/then/when
And/Then/When操作符组合的行为类似于
zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到
Pattern对象,然后操作那个
Pattern对象,变换为一个
Plan。随后将这些
Plan变换为Observable的发射物。
六、combineLatest
当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。
CombineLatest操作符行为类似于
zip,但是只有当原始的Observable中的每一个都发射了一条数据时
zip才发射数据。
CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,
CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
Integer[]itemsOne = {1,2,3,4,5};
Integer[]itemsTwo = {6,7,8,9,10,11};
Observable<Integer> one = Observable.from(itemsOne);
Observable<Integer> two = Observable.from(itemsTwo);
Observable myObservable = Observable.combineLatest(one,two,new Func2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer integer, Integer integer2) {
return integer*integer2;
}
});
Subscriber<Integer> mySubscriber =new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext................."+integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
myObservable.subscribe(mySubscriber);
运行结果:
七、join
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
Join操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。
//源Observable
Observable left = Observable.just("join1-", "join2-", "join3-", "join4-");
//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,当时间超过2s后源Observale将不再发射,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(2,TimeUnit.SECONDS);
}
};
//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,当时间超过6s后源Observale将不再发射,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6,TimeUnit.SECONDS);
}
};
//组合源Observale与目标Observable数据的函数
Func2<String,Integer,String> resultSelector = new Func2<String,Integer,String>(){
@Override
public String call(String s, Integer integer2) {
return s+integer2;
}
};
Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
left.join(right,leftDurationSelector,rightDurationSelector,resultSelector).subscribe(mySubscriber);
运行结果:
八、groupjoin
groupJoin基本和join相同,只是最后组合函数的参数不同。
//源Observable
Observable left = Observable.just("groupJoin1-", "groupJoin2-", "groupJoin3-", "groupJoin4-");
//目标Observable
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
//这个函数觉得了源Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
//func1第一个参数与源Observale的参数一致,每条有效期为3s,join过程停止
Func1<String,Observable<Long>> leftDurationSelector = new Func1<String,Observable<Long>>(){
@Override
public Observable<Long> call(String s) {
return Observable.timer(3, TimeUnit.SECONDS);
}
};
//这个函数觉得了目标Observable发射出来数据的有效期,接收源发射来的数据,返回一个Observable
// func1第一个参数与目标Observale的参数一致,每条有效期为6s,join过程停止
Func1<Integer,Observable<Long>> rightDurationSelector = new Func1<Integer,Observable<Long>>(){
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(6, TimeUnit.SECONDS);
}
};
// 组合源Observale与目标Observable数据的函数
Func2<String,Observable<Integer>,Observable<String>> resultSelector = new Func2<String,Observable<Integer>,Observable<String>>(){
@Override
public Observable<String> call(final String s, Observable<Integer> integerObservable) {
return integerObservable.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return s+integer;
}
});
}
};
left
.groupJoin(right, leftDurationSelector, rightDurationSelector, resultSelector)
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(Observable<String> stringObservable) {
stringObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,"call................."+s);
}
});
}
});
运行结果:
九、switchOnNext
将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据
每隔2s返回一个Observable,这个Observable每隔1s发射一条数据,在第二个Observable产生之前有4s时间,产生了四条数据。当新的Observable产生时,之前的Observable就被丢弃了
Observable<Observable<String>> observable = Observable.timer(2, 2, TimeUnit.SECONDS)
.map(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.timer(1,1,TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong+"";
}
}).take(4);
}
}).take(4);
Subscriber<String> mySubscriber =new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG,"onNext................."+s);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
Observable.switchOnNext(observable).subscribe(mySubscriber);
运行结果:
相关文章推荐
- 7.4 Combining 结合操作 - And/StartWith/Join/Merge/Switch/Zip
- RX操作之辅助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)
- Android函数响应式编程——必学的RxJava组合操作符startWith、merge、concat、zip、combineLastest
- RX操作符之过滤操作二(firstOrDefault、single、elementAt、sample、throttleFirst、throwttleLast、throttleWithTimeout)
- unable to start debugging on the web server. An authentication error occurred while communicating with the web server.
- RX操作符之辅助操作一(materialize、dematerialize、timestamp、serialize、replay、observeOn、subscribeon、dooneach)
- Rxjava结合操作符—merge、 Join
- On Error Resume Next 小解
- on error resume next
- asp中的on error resume next
- On Error Resume Next:VBscript错误处理
- 关于on error resume next
- oracle 中的start with connect by 递归操作
- On error Resume Next的特点
- 5、步步为营VS 2008 + .NET 3.5(5) - LINQ查询操作符之Select、Where、OrderBy、OrderByDescending、GroupBy、Join、GroupJoin及其对应的查询语法
- on error resume next
- Error: Unable to Start Debugging on the Web Server(from MSDN)
- ASP.Net DebugError解决方案[转]:Unable to start debugging on the web server.Debugging failes because integrated Windows authentication is not enabled.
- On Error Resume Next:VBscript错误处理
- 步步为营VS 2008 + .NET 3.5(5) - LINQ查询操作符之Select、Where、OrderBy、OrderByDescending、GroupBy、Join、GroupJoin及其对应的查询语法