您的位置:首页 > 移动开发 > Android开发

Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0)

2016-09-17 23:16 1006 查看
Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0)

Android RxJava第二弹之RxJava封装库 RxJava+Animation RxJava+Glide

Android RxJava第三弹之RxJava2.0尝鲜

本人参考以下文章

给 Android 开发者的 RxJava 详解—扔物线

Rxjava 2.0 与 Rxjava 1.0有什么不同

注:原理讲解可能会用到rx1.0的概念,但是代码示例部分用rx2.0 来展示

引言

很多做android开发朋友对rxjava都有熟悉,github上也出现了很多的基于rxjava的开源库,比如 RxBus RxBinding RxPermission,如果我们了解了RxJava的原理,那么我们也可以很轻松的通过RxJava来封装我们自己的库。后面会有简单的例子来用RxJava来封装Animation。

好了,开始我们的正文

RxJava介绍和原理简析

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava是一个实现反应性扩展框架的Java虚拟机:用于通过使用观察序列构成异步和基于事件的程序库。

扩展了观察者模式,以支持数据/事件序列,并增加了操作符,他可以将将序列清晰的组合在一起的。这些序列组合可以是抽象出来的某些数据/事件,如低级别的线程,同步,线程安全和并发数据结构。

概括的的文字刚开始一般是看不懂的,简单来说RxJava就是一个实现异步操作的库。

扩展的观察者模式

官方的概括中提到了扩展的观察者模式,那么我们先从此入手

观察者模式

Java_观察者模式(Observable和Observer)

在Java中通过Observable类和Observer接口实现了观察者模式。一个Observer对象监视着一个Observable对象的变化,当Observable对象发生变化时,Observer得到通知,就可以进行相应的工作。

这里Observable(被观察者)对象的变化是采用注册(Register)或者称为订阅(Subscribe)的方式告诉Observer(观察者)。

RxJava的观察者模式

RxJava 有四个基本概念:
Observable
(可观察者,即被观察者)、
Observer
(观察者)、
subscribe
(订阅)、事件。
Observable
Observer
通过
subscribe()
方法实现订阅关系,从而
Observable
可以在需要的时候发出事件来通知
Observer


与传统观察者模式不同,
RxJava
的事件回调方法除了普通事件
onNext()
(相当于
onClick()
/
onEvent()
)之外,还定义了两个特殊的事件:
onCompleted()
onError()


onCompleted()
: 事件队列完结。
RxJava
不仅把每个事件单独处理,还会把它们看做一个队列。
RxJava
规定,当不会再有新的
onNext()
发出时,需要触发
onCompleted()
方法作为标志。

onError()
: 事件队列异常。在事件处理过程中出异常时,
onError()
会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中,
onCompleted()
onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,
onCompleted()
onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

备:
RxJava2.0
还添加了一个新的回调方法:
onSubscribe()
,这是为了解决
RxJava1.0
backpressure
问题,后面会讲到

RxJava观察者模式的图如下



RxJava的基本实现

因为
RxJava2.0
引入了很多新的接口,我们在讲原理的时候,直接拿着2.0的代码来做示例,但是有些得放用2.0不太好理解,我们就用1.0的代码来理解原理吧

创建
Subscriber(2.0)
/
Observer(2.0)

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Logger.i("hello  onSubscribe");
}

@Override
public void onNext(String s) {
Logger.i("hello  onNext-->" + s);
}

@Override
public void onError(Throwable t) {
Logger.i("hello  onError");
}

@Override
public void onComplete() {
Logger.i("hello  onComplete");
}
};


Observer<String> observer = new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {
Logger.i("hello  onSubscribe");
}

@Override
public void onNext(String value) {
Logger.i("hello  onNext-->" + value);
}

@Override
public void onError(Throwable e) {
Logger.i("hello  onError");
}

@Override
public void onComplete() {
Logger.i("hello  onComplete");
}
};


Subscriber 和 Observer的接口是分别独立的,Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

创建
Flowable(2.0)
/
Observable(2.0)

Observable
即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用
create()
方法来创建一个
Observable
,并为它定义事件触发规则

Flowable<String> stringFlowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
Logger.i("---rxHelloFlowable---");
}
}, FlowableEmitter.BackpressureMode.BUFFER);


Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onNext("Inke");
e.onComplete();
}
});


可以看到,这里传入了一个
ObservableOnSubscribe
对象作为参数,它的作用相当于一个计划表,当
Observable
被订阅的时候,
ObservableOnSubscribe
subscribe()
方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者
Subscriber
将会被调用两次
onNext()
和一次
onCompleted())
。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava提供快捷创建事件队列的方法

just()
将传入的参数依次发送出来

fromIterable()
将传入的Iterable 拆分成具体对象后,依次发送出来

fromArray()
还没研究明白

心细的朋友可以看到
Flowable
create()
的时候多了一个参数
BackpressureMode
,是用来处理backpressure的发射器

一共有以下几种模式

enum BackpressureMode {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
NONE,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}


个人感觉BUFFER较为安全,api解释为缓冲器存有onNext值,直到下游消费它。

因为
Observer
不支持 backpressure,所以后面的代码我们默认使用RxJava2.0的
Flowable
Subscriber
,但是为了便于理解,某些原理可能还会用RxJava1.0。

Subscribe (订阅)

创建了 Flowable和 Subscriber 之后,再用
subscribe()
方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

stringFlowable.subscribe(subscriber);


有人可能会注意到,
subscribe()
这个方法有点怪:它看起来是『
observalbe
订阅了
observer
/
subscriber
』而不是『
observer
/
subscriber
订阅了
observalbe
』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成
observer.subscribe(observable)
/
subscriber.subscribe(observable)
,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

@Override
public final void subscribe(Subscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
s = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
subscribeActual(s);

}


/**注意:这不是 subscribe()的源码,而是将源码中与性能、兼容性、扩性有关的代码剔除后的核心代码。
*如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
*/
public Disposable onSubscribe(Subscriber subscriber) {
subscriber.onSubscribe();
flowableOnSubscribe.subscribe();
return subscriber;
}


订阅过程做了三件事

调用
Subscriber.onSubscribe()
。是Rx2.0新添加的方法,第一个执行

调用
FlowableOnSubscribe
中的
subscribe()
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,
Flowable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当
subscribe()
方法执行的时候。

上面我们可以看到,通过
subscriber
来订阅返回的是void

在RxJava2.0 如果是直接订阅或传入消费者那么会产生一个新的类

那就是Disposable

/**

* Represents a disposable resource.

*/

代表一个一次性的资源。

代码如下

Disposable subscribe = stringFlowable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {

}
});


订阅源码如下

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}


不过最终走的还是上面的逻辑,不过多返回了一个Disposable,

用于
dispose()
;

线程控制

Scheduler

以下API来自RxJava1.0, 与RxJava2.0用法无区别

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。

-
Schedulers.io(): I/O
操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和
new Thread()
差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比
new Thread()
更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation()
: 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在
computation()
中,否则 I/O 操作的等待时间会浪费 CPU。

另外, Android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用
subscribeOn()
observeOn()
两个方法来对线程进行控制了。 *
subscribeOn()
: 指定
subscribe()
所发生的线程,即
Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程。 *
observeOn()
: 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

下面是一个获取本地资源并显示在控件上的例子

private void rxSchedulerMap() {
Flowable<Bitmap> bitmapFlowable = Flowable.just(R.drawable.effect_icon001)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Bitmap>() {
@Override
public Bitmap apply(Integer integer) throws Exception {
Logger.i("这是在io线程做的bitmap绘制圆形");
return BitmapUtils.createCircleImage(BitmapFactory.decodeResource(MainActivity.this.getResources(), integer));
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
Logger.i("这是在main线程做的UI操作");
imageView.setImageBitmap(bitmap);
}
});
bitmapFlowable.subscribe();
}


想必大家已经看得很清楚了

获取drawable资源我用的io线程

通过
subscribeOn(Schedulers.io())
控制

转变成bitmap并绘制成圆形也是在io线程,可以通过
observeOn(Schedulers.io())
也可以顺着之前的流继续执行

最后显示在UI上是通过
observeOn(AndroidSchedulers.mainThread())


subscribeOn(Scheduler.io())
observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

转换和Scheduler的原理

大家参考扔物线大神的文章吧,我没必要再赘述一遍

变换 & 变换的原理:
lift()
&
compose
: 对 Observable 整体的变换


像一种代理机制,通过事件拦截和处理实现事件序列的变换

在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber



Scheduler 的 API & Scheduler 的原理 & 延伸:doOnSubscribe()







由图中可以看出,①和②两处受第一个
subscribeOn()
影响,运行在红色线程;③和④处受第一个
observeOn()
的影响,运行在绿色线程;⑤处受第二个
onserveOn()
影响,运行在紫色线程;而第二个
subscribeOn()
,由于在通知过程中线程就被第一个
subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个
subscribeOn()
的时候,只有第一个
subscribeOn()
起作用。

常用操作符

在上一个章节

我很还通过
just
直接快捷的生成了Flowable

我们还通过将drwable通过
map
操作符转换成了 bitmap进以下一流的操作

这些操作符使整个逻辑流程很美 很漂亮 很sexy~~~

比那些蜜汁缩进美了不是一点半点, 我们下面来个比较复杂的例子,大家对比一下(用的RxJava1.0,意思一下)

//-----------------------------------蜜汁缩进--嵌套循环--回调地狱 -----------------------------------------------------------

/**
* 实现的功能:获取assets文件夹下所有文件夹中的jpg图片,并且将所有的图片画到一个ImageView上,没有实际的用处,只是为了说明问题--- 谜之缩进--嵌套循环--回调地狱
* 不使用RxJava的写法-- 谜之缩进--回调地狱
*/
//思路:需要以下6个步骤完成
//1:遍历获取assets文件夹下所有的文件夹的名称
//2:遍历获取获取assets文件夹下某个文件夹中所有图片路径的集合
//3:过滤掉非JPG格式的图片
//4:获取某个路径下图片的bitmap
//5:将Bitmap绘制到画布上
//6:循环结束后更新UI,给ImageView设置最后绘制完成后的Bitmap,隐藏ProgressBar
private void miZhiSuoJinAndNestedLoopAndCallbackHell() {
new Thread(new Runnable() {
@Override
public void run() {

runOnUiThread(new Runnable() {
@Override
public void run() {
mProgressBar.setVisibility(View.VISIBLE);
}
});
//1:遍历获取assets文件夹下所有的文件夹的名称
ArrayList<String> assetsFolderNameList = ImageNameFactory.getAssetImageFolderName();

for (String folderName : assetsFolderNameList) {

//2:遍历获取获取assets文件夹下某个文件夹中所有图片路径的集合
ArrayList<String> imagePathList = ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName);

for (final String imagePathName : imagePathList) {
//3:过滤掉非JPG格式的图片
if (imagePathName.endsWith(JPG)) {

//4:获取某个路径下图片的bitmap
final Bitmap bitmap = ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
runOnUiThread(new Runnable() {
@Override
public void run() {
//Logger.d(mCounter + ":" + imagePathName);

//5:将Bitmap绘制到画布上
createSingleImageFromMultipleImages(bitmap, mCounter);
mCounter++;

}
});
}
}
}

//6:循环结束后更新UI,给ImageView设置最后绘制完成后的Bitmap,隐藏ProgressBar
runOnUiThread(new Runnable() {
@Override
public void run() {
mImageView.setImageBitmap(mManyBitmapSuperposition);
mProgressBar.setVisibility(View.GONE);
}
});

}
}).start();
}


//-----------------------------------RxJava的实现--链式调用--十分简洁 -----------------------------------------------------------

private void rxJavaSolveMiZhiSuoJinAndNestedLoopAndCallbackHell() {
//1:被观察者:

//2:数据转换

//3:设置事件的产生发生在IO线程

//4:设置事件的消费发生在主线程

//5:观察者

//6:订阅:被观察者被观察者订阅
mGoToRecycleImageView = false;
Observable.from(ImageNameFactory.getAssetImageFolderName())
//assets下一个文件夹的名称,assets下一个文件夹中一张图片的路径
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String folderName) {
return Observable.from(ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName));
}
})
//过滤,筛选出jpg图片
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String imagePathNameAll) {
return imagePathNameAll.endsWith(JPG);
}
})
//将图片路径转换为对应图片的Bitmap
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String imagePathName) {
return ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
}
})
.map(new Func1<Bitmap, Void>() {
@Override
public Void call(Bitmap bitmap) {
createSingleImageFromMultipleImages(bitmap, mCounter);
mCounter++;
return null;
}
})
.subscribeOn(Schedulers.io())//设置事件的产生发生在IO线程
.doOnSubscribe(new Action0() {
@Override
public void call() {
mProgressBar.setVisibility(View.VISIBLE);
}
})
.observeOn(AndroidSchedulers.mainThread())//设置事件的消费发生在主线程
.subscribe(new Subscriber<Void>() {
@Override
public void onCompleted() {
mImageView.setImageBitmap(mManyBitmapSuperposition);
mProgressBar.setVisibility(View.GONE);
}

@Override
public void onError(Throwable e) {
//Toast.makeText(MainActivity.this, ""+e.getMessage(), Toast.LENGTH_SHORT).show();
}

@Override
public void onNext(Void aVoid) {

}
});
}


操作符部分一览 (基于Rxjava1.0)

Combining Obsercables(Observable的组合操作符)

操作符功能
combineLatest两个Observable产生的结果合并成新Observable,任意Observable产生的结果和另一个Observable最后产生的结果按规则合并
joinlike combineLatest 能控制每个Observable产生结果的生命周期,在每个结果的生命周期内,与另一个Observable产生的结果按规则合并
groupJoinlike join 暂不知其他区别
==merge====按照两个Observable的提交结果的时间顺序,对Observable合并。时间按某Observable完成的最小时间==
mergeDelayError合并的某一个Observable中出现错误,把错误放到所有结果都合并完成之后,订阅者回调执行onError。而merge会马上停止合并
startWith源Observable提交结果之前,插入指定数据
switchOnNext把一组Observable转换成Observable。这组Observable中取最后一个Observable提交的结果给订阅者。
==zip====把两个Observable提交的结果按照顺序进行合并。==

Error Handing Operators(Observable的错误处理操作符)

操作符功能
onErrorReturn在Observable 发生错误或异常(即将回调onError)时,拦截错误并执行指定的逻辑,返回一个跟源Observable相同类型的结果,最后回调订阅者的onComplete方法
onErrorResumeNextlike onErrorReturn 不同的是返回一个Observable 例:return Observable.just(5,2,0);
onExceptionResumeNextlike onErrorResumeNext 不同的是只有在exception的时候触发
==retry====当Observable发生错误或异常,重新执行Observable的逻辑,如果经过n次重新执行后仍然出现错误或异常,则最后回调onError方法,若无错误或异常则按正常流程执行 例:observable.retry(2).subscribe();==
retryWhenlike retry 不同在于retryWhen是在源Observable出现错误或异常时,通过回调第二个Observable来判断是否重新尝试执行源Observable的逻辑;若第二个Observable没错误或异常,则会重新尝试执行源Observable的逻辑,否则就会直接回调执行订阅者的onError();

其他常用

操作符功能
map对源Observable数据的加工处理,返回其他类型 例:return 520+”string data”;
flatMaplike map 不同的是返回一个Observable 扩展:使用了merge操作符 例:return Observable.from(…);
concatMaplike concatMap 不同的是concatMap操作遵循元素的顺序 扩展:使用了concat操作符
compose唯一一个能从流中获取原生Observable的方法,因此,影响整个流的操作符(subscribeOn()和observeOn())需要用compose()。当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,所以compose转换的是整个流
compose与flagMap的区别flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的
concat按顺序依次连接两个或更多的 Observable
first从序列中取第一个先完成的项
takeFirstlike first 区别是first()如果没有释放有效的数据源,那么会throw NoSuchElementException;而takeFirst会complete没有 exception

常用场景

我们前面已经大致理解RxJava和他的基本使用了,虽然是冰山一角,但够我们入门了,现在我们来通过实际项目中常用的场景来进阶学习。

因为RxJava2.0 是16年八九月份刚更新的,没有时间来将1.0的代码替换过来,但是主要的使用方法还是没变的,所以下面的代码大部分是基于RxJava1.0,看客请见谅

RxJava实现三级缓存(RxJava 1.0)

参考文章和开源库

Loading data from multiple sources with RxJava

RxImageloader

使用Rxjava实现三级缓存(

创建三个缓存的Observable对象

Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;


获取第一个源的数据

Observable<Data> source = Observable
.concat(memory, disk, network)
.first(new Func1<Data, Boolean>() {//如果对象为空、说明没有数据从下一层找
public Boolean call(Data data) {
return data!=null;
}
});


concat()订阅了所有需要的Observable。

通过first()会因为取到数据后会停止序列

也就是说,如果memory返回了一个结果,那么我们不会打扰disk 和 network

我们从网络获取到数据,记得存起来。

Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});


RxJava实现心跳(RxJava 2.0)

private Disposable intervalInterval;//心跳
private void rxInterval() {
intervalInterval = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Logger.i("rxInterval---" + aLong);
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Logger.i("rxInterval---txt.setText---" + aLong);
txt.setText("----心跳---" + aLong);
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {

}
});
}
/**
* 停止心跳
* @param v
*/
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.btn:
if (intervalInterval != null) {
intervalInterval.dispose();
}
break;
}
}

@Override
protected void onDestroy() {
super.onDestroy();
if (intervalInterval != null) {
intervalInterval.dispose();
}
}


遍历集合

Flowable.just(new ArrayList<StringEntity>())
.doOnNext(new Consumer<ArrayList<StringEntity>>() {
@Override
public void accept(ArrayList<StringEntity> stringEntities) throws Exception {
for (int i = 0; i < 10; i++) {
stringEntities.add(new StringEntity("rxFromFilter--" + i, i));
}
}
})
.flatMap(new Function<ArrayList<StringEntity>, Publisher<?>>() {
@Override
public Publisher<?> apply(ArrayList<StringEntity> stringEntities) throws Exception {
return handleList(stringEntities);
}
})
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {

}

@Override
public void onNext(Object o) {

}

@Override
public void onError(Throwable t) {

}

@Override
public void onComplete() {

}
});

/**
* 将list转换成Flowable
* @param list
* @return
*/
public Flowable<StringEntity> handleList(ArrayList<StringEntity> list) {
return Flowable.fromIterable(list)
.filter(new Predicate<StringEntity>() {
@Override
public boolean test(StringEntity entity) throws Exception {
return entity.position != 0;
}
})
.doOnNext(new Consumer<StringEntity>() {
@Override
public void accept(StringEntity entity) throws Exception {
Logger.i(entity.getItem());
}
});
}


并发任务(RxJava 1.0)

/**
* 两个耗时任务一起执行
*/
private static Observable<Intent> createLivePlayerRoomPageOrDonePageObservable(final Context context, final int roomId, final String url) {
//获取网络资源的Observable
Observable<RspLiveInfo> rspLiveInfoObservable = LiveNetManager.liveInfo(roomId, null, false);
//获取图片高斯模糊的Observable
Observable<GlideBitmapDrawable> glideBitmapDrawableObservable = RxGlide.afterGlideRequestListener(Global.getContext(), ImageWorker.buildBlurBitmapRequest(context, url));
return Observable.zip(rspLiveInfoObservable, glideBitmapDrawableObservable,
new Func2<RspLiveInfo, GlideBitmapDrawable, Intent>() {
@Override
public Intent call(RspLiveInfo rspLiveInfo, GlideBitmapDrawable glideBitmapDrawable) {
})
.observeOn(AndroidSchedulers.mainThread());
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐