您的位置:首页 > 编程语言 > Java开发

Rxjava的基础用法和源码解析(三)

2020-04-23 09:15 931 查看

这篇博客的用法主要有 : zip,amb,combineLatest ,concatEager

zip

[code]public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction)
... 支持N个参数的最终合并,这里暂时只写出两个的
public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction)

首先是

[code] public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}

这个最终也是需要调用lift的转化方法,lift方法真好用;注意这里传的Observable[],和from不同,just后续会直接会拿Observable[]去转化,而不会把这个数组拆开;看下具体方法

[code]public OperatorZip(Func2 f) {
this.zipFunction = Functions.fromFunc(f);
}

public static <T0, T1, R> FuncN<R> fromFunc(final Func2<? super T0, ? super T1, ? extends R> f) {
return new FuncN<R>() {
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length != 2) {
throw new RuntimeException("Func2 expecting 2 arguments.");
}
return f.call((T0) args[0], (T1) args[1]);
}
};
}

发现这里会把zipFunction计算出的最终结果返回,如果是同一主线程还比较好理解,但如果多个异步,这块是怎么保证所有结果的同步呢;我们看下这个类的详细

[code]public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
@Override
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
final Zip<R> zipper = new Zip<R>(child, zipFunction);
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);

child.add(subscriber);
child.setProducer(producer);

return subscriber;
}
}

其中Zip这个类会存放我们传进来的Subscriber,变量名字为child,这个后面要用到

同样,我们挑重点看,关心ZipSubscriber的方法;同样我们只挑出onNext的执行方法

[code] public void onNext(Observable[] observables) {
if (observables == null || observables.length == 0) {
child.onCompleted();
} else {
started = true;
zipper.start(observables, producer);
}
}

public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) {
final Object[] subscribers = new Object[os.length];
for (int i = 0; i < os.length; i++) {
InnerSubscriber io = new InnerSubscriber();
subscribers[i] = io;
childSubscription.add(io);
}
this.requested = requested;
this.subscribers = subscribers; // full memory barrier: release all above
for (int i = 0; i < os.length; i++) {
os[i].unsafeSubscribe((InnerSubscriber) subscribers[i]);
}
}

这里会把数组里面的每个Observable都拆出来,并依次注册InnerSubscriber,也是就每块处理仍然都是独立的

[code]final class InnerSubscriber extends Subscriber{
...
public void onNext(Object t) {
try {
items.onNext(t);
} catch (MissingBackpressureException e) {
onError(e);
}
tick();
}
}

items是创建的RxRingBuffer对象

[code]public class RxRingBuffer implements Subscription{
public void onNext(Object o) throws MissingBackpressureException {
...
synchronized (this) {
Queue<Object> q = queue;
if (q != null) {
mbe = !q.offer(on.next(o));
} else {
iae = true;
}
}
...
}
}

这个会创建一个Queue队列去存放获得的数据,后面会通过peek方法判断这个queue里面是否有值

注意到这个tick方法,每个InnerSubscriber的onNext都会执行到这里;我们挑出重点看

[code]        ...
final int length = subscribers.length;
...
while (true) {
final Object[] vs = new Object[length];
boolean allHaveValues = true;
for (int i = 0; i < length; i++) {
RxRingBuffer buffer = ((InnerSubscriber) subscribers[i]).items;
Object n = buffer.peek();

if (n == null) {
allHaveValues = false;
continue;
}

if (buffer.isCompleted(n)) {
child.onCompleted();
childSubscription.unsubscribe();
return;
} else {
vs[i] = buffer.getValue(n);
}

...
if (requested.get() > 0 && allHaveValues) {
try {
child.onNext(zipFunction.call(vs));
requested.decrementAndGet();
emitted++;
} catch (Throwable e) {
Exceptions.throwOrReport(e, child, vs);
return;
}

...
}

这里有个while循环,当所有的Observable都正常回调值,也就是vs这个数组被填满后,再把这个数组返给zipFunction处理,最后把处理后的值返给我们定义的Subscriber;所以即使我们发出多个网络请求,最后也能正确处理

例子

[code] Log.e(TAG, "zip start " );
Observable<Integer> observable1 = Observable.just(1).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
try {
Thread.sleep(1000);
}catch (Exception e){}
return integer;
}
}).subscribeOn(Schedulers.io());
Observable<Boolean> observable2 = Observable.just(true).map(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
try {
Thread.sleep(2000);
}catch (Exception e){}
return aBoolean;
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new Func2<Integer, Boolean, String>() {
@Override
public String call(Integer o, Boolean o2) {
return o + ":" + o2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String o) {
Log.e(TAG, "result = " + o);
}
});

这里有两个Observable,sleep不同时间,结果上会以时间最长的那个为准

[code]01-30 15:38:03.135 E/Main: zip start
01-30 15:38:05.156 E/Main: result = 1:true
[code]public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

和之前的一样,这个传入的是一个集合;但必须是同类型的Observable集合

[code] Observable<Integer> observable1 = Observable.just(1);
Observable<Integer> observable2 = Observable.just(2);
List<Observable<Integer>> list = Arrays.asList(observable1, observable2);
Observable.zip(list, new FuncN<String>() {
@Override
public String call(Object... args) {
int result = 0;
for (Object obj : args) {
result += ((Integer) obj).intValue();
}
return "result=" + result;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
});

输出结果

[code]E/Main: result=3
[code]public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {

@Override
public Observable<?>[] call(List<? extends Observable<?>> o) {
return o.toArray(new Observable<?>[o.size()]);
}

}).lift(new OperatorZip<R>(zipFunction));
}

这个方法会把传入Observable转成集合再转成数组,之后的流程和上面描述的基本一致

amb

[code]public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2)
...... 超过两个及以上的方法暂不写出,因为流程是一样的
public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>> sources)

还是以最简单的两个为例

[code]public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return create(OnSubscribeAmb.amb(o1, o2));   }

public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
return amb(sources);
}
public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
return new OnSubscribeAmb<T>(sources);
}

我们发现最终还是会走到上面的第二个方法,而且值得注意的是,这些泛型都必须保持一致

我们挑重点看

[code]public final class OnSubscribeAmb<T> implements OnSubscribe<T>{
@Override
public void call(final Subscriber<? super T> subscriber) {
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
......
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
}
}

所有的Observable都会注册一个AmbSubscriber,而这个里面传入了我们最终输出的subscriber;也是这个内部会处理我们的输出方法,这里的choice后面也会用到,仍然挑重点看

[code]private static final class AmbSubscriber<T> extends Subscriber<T> {
......
@Override
public void onNext(T t) {
if (!isSelected()) {
return;
}
subscriber.onNext(t);
}
@Override
public void onCompleted() {
if (!isSelected()) {
return;
}
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (!isSelected()) {
return;
}
subscriber.onError(e);
}

private boolean isSelected() {
if (chosen) {
return true;
}
if (selection.choice.get() == this) {
chosen = true;
return true;
} else {
if (selection.choice.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
} else {
selection.unsubscribeLosers();
return false;
}
}
}
}

也就是说,只要接收到一次数据,就会给choice赋值,之后只有这一个Subscriber会正常返回true,也就是正常分发;之后的全部返回false,并且全部解除注册;

unsubscribeLosers,顾名思义,失败的全部解除;

amb其实就是一种竞争的队列,第一个到达的会正常返回,之后的全部销毁

例子

[code]Log.e(TAG, "amb start ");
Observable observable1 = Observable.just(1).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return o;
}
});
Observable observable2 = Observable.just(2).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return o;
}
});
Observable.amb(observable1, observable2).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result= " + o);
}
});

第一个sleep时间比第二个要长,第二个要先到达,第一个会被取消掉;输出结果

[code]01-30 16:23:38.681 5595-5595 E/Main: amb start
01-30 16:23:39.705 5595-5616 E/Main: result= 2

combineLatest

[code]public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction)
......
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)

以两个的为例

[code]public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return create(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
}

这个方法最终会组成list调用下面的list的方法

[code]public final class OnSubscribeCombineLatest<T, R> implements OnSubscribe<R> {
public OnSubscribeCombineLatest(Iterable<? extends Observable<? extends T>> sourcesIterable,
FuncN<? extends R> combiner) {
this(null, sourcesIterable, combiner, RxRingBuffer.SIZE, false);
}
public void call(Subscriber<? super R> s) {
Observable<? extends T>[] sources = this.sources;
int count = 0;
......
if (sources == null) {
if (sourcesIterable instanceof List) {
List list = (List)sourcesIterable;
sources = (Observable[])list.toArray(new Observable[list.size()]);
count = sources.length;
} else {
......
}
}
} else {
count = sources.length;
}
if (count == 0) {
s.onCompleted();
return;
}
LatestCoordinator<T, R> lc = new LatestCoordinator<T, R>(s, combiner, count, bufferSize, delayError);
lc.subscribe(sources);
}
}

传入的Observable会转成一个数组source,计算出数组的大小count,然后这里会把subscrbier和combiner转化方法都保存到LastestCoordinator里处理,再看下这个具体类

[code]static final class LatestCoordinator<T, R> extends AtomicInteger implements Producer, Subscription {
static final Object MISSING = new Object();
public LatestCoordinator(Subscriber<? super R> actual,
FuncN<? extends R> combiner,
int count, int bufferSize, boolean delayError) {
this.actual = actual;
this.combiner = combiner;
this.count = count;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.latest = new Object[count];
// latest集合会全部以MISSION填充,其实都是一样的
Arrays.fill(latest, MISSING);
this.subscribers = new CombinerSubscriber[count];
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
this.requested = new AtomicLong();
this.error = new AtomicReference<Throwable>();
}
public void subscribe(Observable<? extends T>[] sources) {
Subscriber<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new CombinerSubscriber<T, R>(this, i);
}
lazySet(0); // release array contents
actual.add(this);
actual.setProducer(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(as[i]);
}
}
}

这里会创建两个和Observable数同样大小的数组 latest和subscribers;

其中latest会存放每个Observable回调的结果,存放的内容和每个Observable是一一对应的,也就是每个Obs只对应一个,如果有新的结果,则会替换掉之前的;

subscribers会把当前即LatestCoordinator和此次要创建的subscriber索引i传入,后面sources的注册的subscriber一一对应;就是每个Observable都会对应一个唯一的CombinerSubscriber,索引值就是上面的 i

[code]static final class CombinerSubscriber<T, R> extends Subscriber<T> {
@Override
public void onNext(T t) {
if (done)  return;
parent.combine(nl.next(t), index);
}
@Override
public void onError(Throwable t) {
......
parent.onError(t);
done = true;
parent.combine(null, index);
}
@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
parent.combine(null, index);
}

void combine(Object value, int index) {
CombinerSubscriber<T, R> combinerSubscriber = subscribers[index];
int activeCount;
int completedCount;
int sourceCount;
boolean empty;
boolean allSourcesFinished;
synchronized (this) {
//数目等于传入的Observable对象数
sourceCount = latest.length;
Object o = latest[index];
activeCount = active;
if (o == MISSING) {
active = ++activeCount;
//最开始这个一直是相等的,除非其中一个走了onError
//否则这个数最后叠加会和sourceCount相等
}
completedCount = complete;
if (value == null) {
complete = ++completedCount;
//完成数,正常走complete的value都是空的(Error也是空的)
} else {
latest[index] = combinerSubscriber.nl.getValue(value);
//每走一次onNext都会赋一次值,也就是会把之前的替换掉
}
allSourcesFinished = activeCount == sourceCount;
//所有的Observable都至少回调了一次onNext,也就是latest填满了
empty = completedCount == sourceCount
|| (value == null && o == MISSING);
//全部结束(包括异常) 或者 其中出现了一次异常结果
//这个不太好理解,我们反推,否命题就是 : 事件还在继续分发而且没有发生过一次异常

if (!empty) {
if (value != null && allSourcesFinished) {
//latest被填充满,同时此次回调的值不为空
//也就是latest集合中的最后一个被填充后,后续再来值都会走这个方法
queue.offer(combinerSubscriber, latest.clone());
} else
if (value == null && error.get() != null) {
done = true;
}
} else {
done = true;
}
}
if (!allSourcesFinished && value != null) {
combinerSubscriber.requestMore(1);
return;
}
drain();
}
}

那么重点就在drain方法了,摘出重点

[code]void drain() {
if (getAndIncrement() != 0) {
return;
}
final Queue<Object> q = queue;
final Subscriber<? super R> a = actual;
int missed = 1;
for (;;) {
......
while (requestAmount != 0L) {
CombinerSubscriber<T, R> cs = (CombinerSubscriber<T, R>)q.peek();
boolean empty = cs == null;
......
if (empty) {
break;
}
q.poll();
Object[] array = (Object[])q.poll();
if (array == null) {
cancelled = true;
cancel(q);
a.onError(new IllegalStateException("Broken queue?! Sender received but not the array."));
return;
}
R v;
try {
v = combiner.call(array);
} catch (Throwable ex) {
cancelled = true;
cancel(q);
a.onError(ex);
return;
}
a.onNext(v);
......
}
......
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

当queue集合不为空的时候,摘出此时拷贝的latest副本,把这个副本的集合用我们定义的combiner方法处理后再传给我们将要处理的subscriber;

所以对于多个的分发,Observable内子分发个数不一定要保持一样,后续计算结果也并不需要同时抵达,这个是和zip的很大区别,zip方法要求分发数必须相同,否则后续的结果接受不到

例子

[code] Log.e(TAG, "combineLastest start");
Observable observable1 = Observable.just(1,2).subscribeOn(Schedulers.io()).map(new Func1() {
@Override
public Object call(Object o) {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return o;
}
});
Observable observable2 = Observable.just(true, false).subscribeOn(Schedulers.io()).map(new Func1() {
@Override
public Object call(Object o) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return o;
}
});
Observable.combineLatest(observable1, observable2, new Func2<Integer, Boolean, String>() {
@Override
public String call(Integer o, Boolean o2) {
return o + " : " + o2;
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result= " + o);
}
});

作区分我做了sleep时间,实际输出结果是

[code]01-30 22:52:44.958 E/Main: combineLastest start
01-30 22:52:46.978 E/Main: result= 1 : true
01-30 22:52:46.982 E/Main: result= 1 : false
01-30 22:52:48.979 E/Main: result= 2 : false

我们发现接收到了三次,第一次要求是同时抵达;后续会按照上次的结果计算,第二次是false先抵达,然后取第一个obs1上次的1, 第三次则是2抵达,取obs2上次的false ; 这个的使用场景和我们登陆输入账号密码有点类似

那我们如果改成 zip呢,输出结果是

[code]01-30 22:57:11.954 E/Main: result= 1 : true
01-30 22:57:13.956 E/Main: result= 2 : false

只有两次,结果索引是匹配的,我们再把第一个obs删掉一个数据,改成Observable.just(1),那么输出结果是

[code]01-30 22:58:46.174 E/Main: result= 1 : true

结果就只剩一个了,因为第二个索引个数不匹配

后面两个方法和之前类似,变成了集合而已,而且也是泛型需要统一

concatEager

[code]public static <T> Observable<T> concatEager(Observable<? extends T> o1, Observable<? extends T> o2)
......
public static <T> Observable<T> concatEager(Iterable<? extends Observable<? extends T>> sources)
public static <T> Observable<T> concatEager(Iterable<? extends Observable<? extends T>> sources, int capacityHint)

仍然以两个的为例

[code]public final class OperatorEagerConcatMap<T, R> implements Operator<R, T> {
@Override
public Subscriber<? super T> call(Subscriber<? super R> t) {
EagerOuterSubscriber<T, R> outer = new EagerOuterSubscriber<T, R>(mapper, bufferSize, t);
outer.init();
return outer;
}
}

static final class EagerOuterSubscriber<T, R> extends Subscriber<T> {
public EagerOuterSubscriber(Func1<? super T, ? extends Observable<? extends R>> mapper, int bufferSize,
Subscriber<? super R> actual) {
this.mapper = mapper;
this.bufferSize = bufferSize;
this.actual = actual;
this.subscribers = new LinkedList<EagerInnerSubscriber<R>>();
this.wip = new AtomicInteger();
}
public void onNext(T t) {
Observable<? extends R> observable;

try {
observable = mapper.call(t);
} catch (Throwable e) {
Exceptions.throwOrReport(e, actual, t);
return;
}
EagerInnerSubscriber<R> inner = new EagerInnerSubscriber<R>(this, bufferSize);
if (cancelled) {
return;
}
synchronized (subscribers) {
if (cancelled) {
return;
}
subscribers.add(inner);
}
if (cancelled) {
return;
}
observable.unsafeSubscribe(inner);
drain();
}
}

这里创建了一个LinkList存放后面注册的EagerInnerSubscriber,按照顺序添加,这个onNext里面会依次回调添加进的Observable,并顺序注册上面的InnerSubscriber

[code]static final class EagerInnerSubscriber<T> extends Subscriber<T> {
final EagerOuterSubscriber<?, T> parent;
final Queue<Object> queue;
@Override
public void onNext(T t) {
//每次获取到数据都会把这个数据添加到这个queue队列中
queue.offer(nl.next(t));
parent.drain();
}
@Override
public void onCompleted() {
done = true;
parent.drain();
}
}

数据会存放到这里的queue队列里

[code]void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
......
final Subscriber<? super R> actualSubscriber = this.actual;
for (;;) {
......
EagerInnerSubscriber<R> innerSubscriber;
boolean outerDone = done;
synchronized (subscribers) {
//按照注册的顺序取出第一个
innerSubscriber = subscribers.peek();
}
boolean empty = innerSubscriber == null;
......
if (!empty) {
......
Queue<Object> innerQueue = innerSubscriber.queue;
boolean innerDone = false;
for (;;) {
outerDone = innerSubscriber.done;
//判断这个subscriber是否已经分发完成
//回调onComplete会把这个done设成ture
Object v = innerQueue.peek();
//取出这个innersubscriber中第一个数据
empty = v == null;
.......
if (outerDone) {
Throwable innerError = innerSubscriber.error;
if (innerError != null) {
cleanup();
actualSubscriber.onError(innerError);
return;
} else
if (empty) {
synchronized (subscribers) {
subscribers.poll();
//如果这个innerSubscriber已经回调onCompleted
//同时里面的queue数据已经被全部取出,就把这个对应的sub弹出
//这样下次取的就是按顺序的第二个subscriber
}
innerSubscriber.unsubscribe();
innerDone = true;
break;
}
}
if (empty) {
break;
}
innerQueue.poll();
//如果第一个数有值,则弹出queue的第一个数据,并把这个数据返给我们定义的subscriber
try {
actualSubscriber.onNext(nl.getValue(v));
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, actualSubscriber, v);
return;
}
......
}
......
if (innerDone) {
//这个Observable已经完成,遍历下一个
continue;
}
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
return;
}
}
}

每次获得结果都会根据需求判断是否要循环遍历,如果正好是第一个注册的则输出,如果非首个注册的,则缓存数据,等待第一个完成再输出这个缓存数据

这个实际输出结果和concat类似,不同的是concat是按照Observable顺序依次执行,等第一个有结果后再去执行第二个;

这个方法是可以同时执行,每次取到结果时候进行判断,保持结果依次发出

例子

[code]Observable observable1 = Observable.just(1).subscribeOn(Schedulers.io()).map(new Func1() {
@Override
public Object call(Object o) {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
Log.e(TAG,"obs1获得了结果");
return o;
}
});
Observable observable2 = Observable.just(2).subscribeOn(Schedulers.io()).map(new Func1() {
@Override
public Object call(Object o) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
Log.e(TAG,"obs2获得了结果");
return o;
}
});
Observable.concatEager(observable1, observable2).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result= " + o);
}
});

输出结果是

[code]01-31 00:19:23.597  E/Main: obs2获得了结果
01-31 00:19:24.586  E/Main: obs1获得了结果
01-31 00:19:24.586  E/Main: result= 1
01-31 00:19:24.586  E/Main: result= 2

同样,如果我们换成concat方法,输出则变成了

[code]01-31 00:19:08.050  E/Main: obs1获得了结果
01-31 00:19:08.050  E/Main: result= 1
01-31 00:19:09.054  E/Main: obs2获得了结果
01-31 00:19:09.054  E/Main: result= 2

类似于我们的同步和异步概念

  • 点赞
  • 收藏
  • 分享
  • 文章举报
迷途の知归 发布了20 篇原创文章 · 获赞 1 · 访问量 3008 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: