您的位置:首页 > 编程语言 > Java开发

RxJava 变换、 组合 、 合并操作符

2018-01-23 17:35 302 查看
/**
* @author :houde
*         时间:2018/1/23
*         Des:RxJava 变换操作符
*/

public class RxOperateActivity extends AppCompatActivity {
private final String TAG = "RxOperateActivity";
Observable<Integer> observable1 = Observable.just(1,2,3,4);
Observable<String>  observable2 = Observable.just("A","B","C");
private Observer<String> stringObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(String s) {
Log.e(TAG,s);
}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"对Complete事件作出响应");
}
};
private Observer<Integer> intObserver = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(Integer integer) {
Log.e(TAG,"事件 = " + integer);

}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"对Complete事件作出响应");
}
} ;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_image);
//转换操作符
/**
*  作用
*      对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件
*      即:将被观察者发送的事件转换为任意的类型事件。
*  应用场景
*       数据类型转换
*   具体使用
*       下面以将 使用Map()将事件的参数从整型变换成字符串类型为例子说明
*/
map();
/**
* 作用:
*  将被观察者发送的事件序列进行拆分&单独转换,再合并成一个新的事件序列,最后再进行发送
*
*  原理
*      1.为事件序列中每个事件都创建一个 Observable 对象;
*      2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
*      3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
*      4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
*  应用场景
*      无序的将被观察者发送的整个事件序列进行变换
*/
flatMap();
/**
* 作用:类似FlatMap()操作符
* 与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序
* 应用场景
*      有序的将被观察者发送的整个事件序列进行变换
*/
concatMap();
/**
* 作用
*  定期从 被观察者(Observable)需要发送的事件中获取一定数量的事件&放到缓存区中,
*  最终发送
*
* 应用场景
*  缓存被观察者发送的事件
*/
buffer();
//组合和并操作符
/**
* 作用
*      组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
*      二者区别:
*      组合被观察者的数量,即concat()组合被观察者数量≤4个,
*      而concatArray()则可>4个
*/
concat();
concatArray();
/**
* 作用
*      组合多个被观察者一起发送数据,合并后 按时间线并行执行
*
* 二者区别:
* 组合被观察者的数量,即merge()组合被观察者数量≤4个,
* 而mergeArray()则可>4个
*
* 区别上述concat()操作符:
* 同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
*/
merge();
mergeArray();
/**
* 背景:
*      使用merge和concat操作符时,
* 冲突:
*      若其中一个被观察者发出onError事件,则会终止其他被观察者继续发送事件
* 解决方案:
*      若希望onError事件推迟到其他被观察者发送完事件之后再触发
*      即需要使用对应的mergeDelayError()或concatDelayError操作符
*
*/
concatDelayError();
mergeDelayError();
//事件的合并
/**
* 作用
*      合并多个被观察者(Observable)发送的事件,
*      生成一个新的事件序列(即组合过后的事件序列),并最终发送
* 特别注意:
*      事件组合方式 = 严格按照原先事件序列 进行对位合并
*      最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
* 特别注意:
*      尽管被观察者2的事件D没有事件与其合并,但还是会继续发送
*      若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,
*      则被观察者2的事件D也不会发送,测试结果如下
* 定义:
*      属于Rxjava中的组合
* 作用:
*      1.合并多个被观察者(Observable)发送的事件
*      2.生成一个新的事件序列(即合并之后的序列),并最终发送
* 原理:
*      1.事件组合方式 = 严格按照原先事件序列进行对位合并
*      2.最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
*
* 应用场景:
*      1.当展示的信息需要从多个地方获取(即 信息 = 信息1 + 信息2)& 统一结合后再展示
*      2.如:合并网络请求的发送 & 统一展示结果
*/
zip();
/**
* 作用
*      当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables
*      的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该
*      函数的结果发送数据
* 与Zip()的区别:
*      Zip() = 按个数合并,
*      即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并
*
* combineLatestDelayError()
*      作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
*/
combineLatest();
/**
* 作用
*     把被观察者需要发送的事件聚合成1个事件 & 发送
*
*  聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
*/
reduce();
/**
*作用
*    将被观察者Observable发送的数据事件收集到一个数据结构里
*/
collect();
//发送事件前追加发送事件
/**
* 作用
*     在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者
*/
startWith();
startWithArray();
//统计发送事件数量
/**
* 作用
*      统计被观察者发送事件的数量
*/
count();

}

private void count() {
observable1.count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG,"发送事件的次数 = " + aLong);
}
});
}

private void startWithArray() {
Observable.just(4,5,6,7)
.startWith(0)
.startWithArray(1,2,3)
.subscribe(intObserver);
}

private void startWith() {
Observable.just(1,2,3,4)
.startWith(0)
.subscribe(intObserver);
}

private void collect() {
observable1.collect(
// 1. 创建数据结构(容器),用于收集被观察者发送的数据
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
// 参数说明:list = 容器,integer = 后者数据
list.add(integer);
// 对发送的数据进行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
Log.e(TAG, "本次发送的数据是: " + list);
}
});
}

private void reduce() {
observable1.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer s1, Integer s2) throws Exception {
Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
return s1 * s2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "最终计算的结果是: " + integer);
}
});
}

private void combineLatest() {
Log.e(TAG,"-------------------combineLatest-------------------");

Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
}).subscribe(stringObserver);

}

private void zip() {
Log.e(TAG,"-------------------zip-------------------");

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(stringObserver);
}

private void mergeDelayError() {
Log.e(TAG,"-------------------mergeDelayError-------------------");
Observable.mergeArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
emitter.onError(new NullPointerException("这里发送了一个onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);
}

private void concatDelayError() {
Log.e(TAG,"-------------------concatDelayError-------------------");
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
emitter.onError(new NullPointerException("这里发送了一个onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);

}

private void mergeArray(){
Log.e(TAG,"-------------------mergeArray-------------------");
Observable.mergeArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(7,8,9),
Observable.just(10,11,12),
Observable.just(13,14,15),
Observable.just(16,17,18)
).subscribe(intObserver);
}
private void merge(){
Log.e(TAG,"-------------------merge-------------------");

Observable.merge(Observable.just(1,2,3,4),
Observable.just(5,6),
Observable.just(7,8,9),
Observable.just(10,11,12,13)
)
.subscribe(intObserver);
}
private void concatArray(){
Log.e(TAG,"-------------------concatArray-------------------");
Observable.concatArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10),
Observable.just(11,12,13),
Observable.just(14,15,16)
).subscribe(intObserver);
}
private void concat(){
// concat():组合多个被观察者(≤4个)一起发送数据
// 注:串行执行
Log.e(TAG,"-------------------concat-------------------");

Observable.concat(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10)
).subscribe(intObserver);
}

private void buffer() {
Log.e(TAG,"-------------------buffer-------------------");

Observable.just(1,2,3,4,5,6,7,8)
// 设置缓存区大小 & 步长
// 缓存区大小 = 每次从被观察者中获取的事件数量
// 步长 = 每次获取新事件的数量
.buffer(3,1)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(List<Integer> ints) {
Log.e(TAG,"缓存区里的事件个数" + ints.size());
for(int i = 0 ,size = ints.size(); i < size;i++){
Log.e(TAG,"事件 = " + i);
}
}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"对Complete事件作出响应");
}
});
}

private void concatMap() {
Log.e(TAG,"-------------------concatMap-------------------");
Observable.just(4,3,2,1)
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
})
.subscribe(stringObserver);
}

private void flatMap() {
Log.e(TAG,"-------------------flatMap-------------------");
Observable.just(1,2,3,4)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
}).subscribe(stringObserver);
}

@NonNull
private List<String> getEvents(Integer integer) {
List<String> event = new ArrayList<>(3);
for(int i =  0 ; i < 3 ; i++){
event.add("我是事件 " + integer + "拆分后的子事件" + i);
}
return event;
}

private void map() {
Log.e(TAG,"-------------------map-------------------");
Observable.just(1,2,3,4).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer;
}
}).subscribe(stringObserver);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: