RxJava 变换、 组合 、 合并操作符
2018-01-23 17:35
302 查看
/** * @author :houde * 时间:2018/1/23 * Des:RxJava 变换操作符 */ public class RxOperateActivity extends AppCompatActivity { private final String TAG = "RxOperateActivity"; Observable<Integer> observable1 = Observable.just(1,2,3,4); Observable<String> observable2 = Observable.just("A","B","C"); private Observer<String> stringObserver = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"开始采用subscribe连接"); } @Override public void onNext(String s) { Log.e(TAG,s); } @Override public void onError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"对Complete事件作出响应"); } }; private Observer<Integer> intObserver = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"开始采用subscribe连接"); } @Override public void onNext(Integer integer) { Log.e(TAG,"事件 = " + integer); } @Override public void onError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"对Complete事件作出响应"); } } ; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_image); //转换操作符 /** * 作用 * 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件 * 即:将被观察者发送的事件转换为任意的类型事件。 * 应用场景 * 数据类型转换 * 具体使用 * 下面以将 使用Map()将事件的参数从整型变换成字符串类型为例子说明 */ map(); /** * 作用: * 将被观察者发送的事件序列进行拆分&单独转换,再合并成一个新的事件序列,最后再进行发送 * * 原理 * 1.为事件序列中每个事件都创建一个 Observable 对象; * 2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象; * 3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象; * 4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer) * 应用场景 * 无序的将被观察者发送的整个事件序列进行变换 */ flatMap(); /** * 作用:类似FlatMap()操作符 * 与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序 * 应用场景 * 有序的将被观察者发送的整个事件序列进行变换 */ concatMap(); /** * 作用 * 定期从 被观察者(Observable)需要发送的事件中获取一定数量的事件&放到缓存区中, * 最终发送 * * 应用场景 * 缓存被观察者发送的事件 */ buffer(); //组合和并操作符 /** * 作用 * 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行 * 二者区别: * 组合被观察者的数量,即concat()组合被观察者数量≤4个, * 而concatArray()则可>4个 */ concat(); concatArray(); /** * 作用 * 组合多个被观察者一起发送数据,合并后 按时间线并行执行 * * 二者区别: * 组合被观察者的数量,即merge()组合被观察者数量≤4个, * 而mergeArray()则可>4个 * * 区别上述concat()操作符: * 同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行 */ merge(); mergeArray(); /** * 背景: * 使用merge和concat操作符时, * 冲突: * 若其中一个被观察者发出onError事件,则会终止其他被观察者继续发送事件 * 解决方案: * 若希望onError事件推迟到其他被观察者发送完事件之后再触发 * 即需要使用对应的mergeDelayError()或concatDelayError操作符 * */ concatDelayError(); mergeDelayError(); //事件的合并 /** * 作用 * 合并多个被观察者(Observable)发送的事件, * 生成一个新的事件序列(即组合过后的事件序列),并最终发送 * 特别注意: * 事件组合方式 = 严格按照原先事件序列 进行对位合并 * 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量 * 特别注意: * 尽管被观察者2的事件D没有事件与其合并,但还是会继续发送 * 若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件, * 则被观察者2的事件D也不会发送,测试结果如下 * 定义: * 属于Rxjava中的组合 * 作用: * 1.合并多个被观察者(Observable)发送的事件 * 2.生成一个新的事件序列(即合并之后的序列),并最终发送 * 原理: * 1.事件组合方式 = 严格按照原先事件序列进行对位合并 * 2.最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量 * * 应用场景: * 1.当展示的信息需要从多个地方获取(即 信息 = 信息1 + 信息2)& 统一结合后再展示 * 2.如:合并网络请求的发送 & 统一展示结果 */ zip(); /** * 作用 * 当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables * 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该 * 函数的结果发送数据 * 与Zip()的区别: * Zip() = 按个数合并, * 即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并 * * combineLatestDelayError() * 作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述 */ combineLatest(); /** * 作用 * 把被观察者需要发送的事件聚合成1个事件 & 发送 * * 聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推 */ reduce(); /** *作用 * 将被观察者Observable发送的数据事件收集到一个数据结构里 */ collect(); //发送事件前追加发送事件 /** * 作用 * 在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者 */ startWith(); startWithArray(); //统计发送事件数量 /** * 作用 * 统计被观察者发送事件的数量 */ count(); } private void count() { observable1.count().subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG,"发送事件的次数 = " + aLong); } }); } private void startWithArray() { Observable.just(4,5,6,7) .startWith(0) .startWithArray(1,2,3) .subscribe(intObserver); } private void startWith() { Observable.just(1,2,3,4) .startWith(0) .subscribe(intObserver); } private void collect() { observable1.collect( // 1. 创建数据结构(容器),用于收集被观察者发送的数据 new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } // 2. 对发送的数据进行收集 }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> list, Integer integer) throws Exception { // 参数说明:list = 容器,integer = 后者数据 list.add(integer); // 对发送的数据进行收集 } }).subscribe(new Consumer<ArrayList<Integer>>() { @Override public void accept(ArrayList<Integer> list) throws Exception { Log.e(TAG, "本次发送的数据是: " + list); } }); } private void reduce() { observable1.reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer s1, Integer s2) throws Exception { Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2); return s1 * s2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "最终计算的结果是: " + integer); } }); } private void combineLatest() { Log.e(TAG,"-------------------combineLatest-------------------"); Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return s + integer; } }).subscribe(stringObserver); } private void zip() { Log.e(TAG,"-------------------zip-------------------"); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(stringObserver); } private void mergeDelayError() { Log.e(TAG,"-------------------mergeDelayError-------------------"); Observable.mergeArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件 emitter.onError(new NullPointerException("这里发送了一个onError()")); emitter.onComplete(); } }), Observable.just(4, 5, 6)) .subscribe(intObserver); } private void concatDelayError() { Log.e(TAG,"-------------------concatDelayError-------------------"); Observable.concat( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件 emitter.onError(new NullPointerException("这里发送了一个onError()")); emitter.onComplete(); } }), Observable.just(4, 5, 6)) .subscribe(intObserver); } private void mergeArray(){ Log.e(TAG,"-------------------mergeArray-------------------"); Observable.mergeArray(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(7,8,9), Observable.just(10,11,12), Observable.just(13,14,15), Observable.just(16,17,18) ).subscribe(intObserver); } private void merge(){ Log.e(TAG,"-------------------merge-------------------"); Observable.merge(Observable.just(1,2,3,4), Observable.just(5,6), Observable.just(7,8,9), Observable.just(10,11,12,13) ) .subscribe(intObserver); } private void concatArray(){ Log.e(TAG,"-------------------concatArray-------------------"); Observable.concatArray(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(9,10), Observable.just(11,12,13), Observable.just(14,15,16) ).subscribe(intObserver); } private void concat(){ // concat():组合多个被观察者(≤4个)一起发送数据 // 注:串行执行 Log.e(TAG,"-------------------concat-------------------"); Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(9,10) ).subscribe(intObserver); } private void buffer() { Log.e(TAG,"-------------------buffer-------------------"); Observable.just(1,2,3,4,5,6,7,8) // 设置缓存区大小 & 步长 // 缓存区大小 = 每次从被观察者中获取的事件数量 // 步长 = 每次获取新事件的数量 .buffer(3,1) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"开始采用subscribe连接"); } @Override public void onNext(List<Integer> ints) { Log.e(TAG,"缓存区里的事件个数" + ints.size()); for(int i = 0 ,size = ints.size(); i < size;i++){ Log.e(TAG,"事件 = " + i); } } @Override public void onError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"对Complete事件作出响应"); } }); } private void concatMap() { Log.e(TAG,"-------------------concatMap-------------------"); Observable.just(4,3,2,1) .concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.fromIterable(getEvents(integer)); } }) .subscribe(stringObserver); } private void flatMap() { Log.e(TAG,"-------------------flatMap-------------------"); Observable.just(1,2,3,4) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.fromIterable(getEvents(integer)); } }).subscribe(stringObserver); } @NonNull private List<String> getEvents(Integer integer) { List<String> event = new ArrayList<>(3); for(int i = 0 ; i < 3 ; i++){ event.add("我是事件 " + integer + "拆分后的子事件" + i); } return event; } private void map() { Log.e(TAG,"-------------------map-------------------"); Observable.just(1,2,3,4).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer; } }).subscribe(stringObserver); } }
相关文章推荐
- Android RxJava操作符详解 系列:组合 / 合并操作符
- Android RxJava操作符详解 系列:组合 / 合并操作符
- RxJava变换操作符:.concatMap( )与.flatMap( )的比较
- RxJava系列之二 变换类操作符详解1
- RxJava的学习之变换操作符—buffer
- RxJava变换操作符:.concatMap( )与.flatMap( )的比较
- Android RxJava操作符详解系列: 变换操作符
- RxJava变换操作符:.concatMap( )与.flatMap( )的比较
- Android RxJava操作符详解系列: 变换操作符
- RxJava之组合操作符
- RxJava系列5(组合操作符)
- [Android开发] RxJava2之路四 - 操作符简介与变换操作符例子Demo
- RxJava 系列之变换操作符map(2)
- RxJava 合并组合两个(或多个)Observable数据源
- RxJava使用详解--组合操作符
- RxJava操作符(03-变换操作)
- RxJava操作符(二) __变换操作
- RxJava操作符总结之变换
- RxJava操作符(03-变换操作)
- RxJava系列5(组合操作符)