您的位置:首页 > 编程语言 > Java开发

Rxjava的基础用法和源码解析(二)

2020-04-23 09:15 951 查看

这篇博客的用法主要有 : lift , map , merge , concat

lift

[code]public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator)

这个方法算是rxjava中比较核心的方法,后面的map,flatmap等都会用到这个模块,看一下基本源码

[code] public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//实际输出大概是这样
Subscriber<? super T> st = operator.call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});

这个其实就是一个类型转化的入口

我们最初传入的泛型是T,然后我们想要转化成泛型R进行输出,所以我们最终subscriber的泛型输出也是R,就是传入这里的o ,但为什么operator.call(o) 要返回一个T的subscriber呢? 这是因为我们要和最初传入的泛型保持一致,不然无法接受到参数,operator.call方法会创建一个新的subscriber,然后内部转化后回调o的具体方法,看似不变其实内部已经消化掉了;我们看一个例子

[code] Observable.just(1).lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
return new Subscriber<Integer>() {
@Override
public void onStart() {
Log.e(TAG,"onStart 1");
}
@Override
public void onCompleted() {
Log.e(TAG,"onCompleted 1");
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError 1");
subscriber.onError(e);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext 1");
subscriber.onNext("No. "+integer);
}
};
}
}).subscribe(new Subscriber<String>() {
@Override
public void onStart() {
Log.e(TAG,"onStart 2");
}
@Override
public void onCompleted() {
Log.e(TAG,"onCompleted 2");
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError 2");
}
@Override
public void onNext(String s) {
Log.e(TAG,"onNext 2 ->>> result = "+s);
}
});

这里我们把1转化成String后最终输出,结果是

[code]01-29 13:47:11.551 E/Main: onStart 2
01-29 13:47:11.551 E/Main: onStart 1
01-29 13:47:11.551 E/Main: onNext 1
01-29 13:47:11.551 E/Main: onNext 2 ->>> result = No. 1
01-29 13:47:11.551 E/Main: onCompleted 1
01-29 13:47:11.551 E/Main: onCompleted 2

至于为什么后续的start比我们的转化start要靠前,因为subscribe的时候优先会执行注册的subscriber对象,转化的会在lift里面的OnSubscribe.call()方法里执行,所以会滞后;

[code]  private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
...
}

map

[code]public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}

map最终会执行lift的转化模块,OperatorMap是新建的一个转化器,用法和lift差不多,对于单向转化更加方便

[code]public final class OperatorMap<T, R> implements Operator<R, T> {

final Func1<? super T, ? extends R> transformer;

public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}

};
}

用法很简单,具体结果就写了

[code]        Observable.just(1).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "No. " + integer;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "result = " + s);
}
});

merge

[code]public static <T> Observable<T> merge(Observable<? extends T>... t)
public static <T> Observable<T> merge(Observable<? extends T>[] sequences)
public static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent)
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences)
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent)
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source)
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)

我们分别看一下具体方法

[code]public static <T> Observable<T> merge(Observable<? extends T>... t)

最终会执行到

[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}

from就是我们常用的遍历方法,最后就是还是会调用lift的转化方法,传入要被转化的操作详细,我们挑出里面的call方法

[code]public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
this.child = child;
...
}

@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;

child.add(subscriber);
child.setProducer(producer);

return subscriber;
}

其中MergeSubscriber继承自 Subscriber<Observable<? extends T>>,并把我们最终需要发布的Subscriber<T>对象传入,也就是变成里面的成员变量child,那么我们只需要关心MergeSubscriber内部转化的onNext,onError和onComplete方法;我们挑onNext方法

[code]public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}

这里又做了一次封装,也是这个InnerSubscriber也是一个Subscriber,然后调用我们上面传入的MergeSubscriber的unsafeSubscribe注册这个inner对象;

因为分发到这里的是Observable,我们接收具体值需要对Oservable注册subscriber,所以InnerSubscriber应该是分发具体的值;
虽然这个onNext方法里面是顺序的,但对于每个注册的InnerSubscriber都是独立的,也就是说每个分发事件接收都是独立的,如果这里的Observable对象都指定不同线程的话,那接受事件就会有很大不确定性,也就是并不一定会按传入的顺序依次发出;
我们再看下这个InnerSubscribe的onNext方法

[code]public void onNext(T t) {
parent.tryEmit(this, t);
}

void tryEmit(InnerSubscriber<T> subscriber, T value) {
...
emitScalar(subscriber, value, r);
...
}

protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
...
child.onNext(value);
...
}

最后会调用我们定义的subscriber的onNext方法,传入拆解后具体的属性值

[code]Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return o;
}
});
Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(800);
} catch (Exception e) {
}
return o;
}
});
Observable.merge(observable1, observable2).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result = " + o + "  " + Thread.currentThread());
}
});

我这里指定两个不同线程,并进行sleep不同时长,输出结果是

[code]01-29 19:07:35.823  E/Main: result = 3  Thread[RxCachedThreadScheduler-1,5,main]
01-29 19:07:36.022  E/Main: result = 1  Thread[RxComputationThreadPool-1,5,main]
01-29 19:07:36.626  E/Main: result = 4  Thread[RxCachedThreadScheduler-1,5,main]
01-29 19:07:37.022  E/Main: result = 2  Thread[RxComputationThreadPool-1,5,main]

也就是说指定不同线程输出有可能会错误,那我不指定线程呢,就是把上面两个subscribeOn去掉,输出结果是

[code]01-29 19:10:18.627  E/Main: result = 1  Thread[main,5,main]
01-29 19:10:19.627  E/Main: result = 2  Thread[main,5,main]
01-29 19:10:20.428  E/Main: result = 3  Thread[main,5,main]
01-29 19:10:21.231  E/Main: result = 4  Thread[main,5,main]

输出结果结果就变成正序的了

[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences)
public static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent)

这个数组上面提到了,merge传的对象最后会组装到一个数组里;maxConcurrent这个是可同时注册的最大subscriber数目,按照传入的Observable依次注册,如果超出这个个数,剩下的会在之前完成后再注册,可以类比成我们的线程池,超出的处于等待状f态,比如上面的例子可以改成

[code] Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(1000);
} catch (Exception e) { }
return o;
}
});
Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.newThread());
Observable.merge(new Observable[]{observable1, observable2}, 1).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result = " + o + "  " + Thread.currentThread());
}
});

我们限定同时可注册的最大个数是1,并且给第一个设置了sleep,输出结果是

[code]01-29 20:23:58.562  E/Main: result = 1  Thread[RxCachedThreadScheduler-1,5,main]
01-29 20:23:59.565  E/Main: result = 2  Thread[RxCachedThreadScheduler-1,5,main]
01-29 20:23:59.568  E/Main: result = 3  Thread[RxNewThreadScheduler-1,5,main]
01-29 20:23:59.568  E/Main: result = 4  Thread[RxNewThreadScheduler-1,5,main]

我们发现虽然是不同线程,但结果按照我们传入顺序依次输出,这就是因为我们限制了同时可注册的最大个数

[code]public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences)
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent)

这个方法和上面的类似,从数组变成了Iterable,就是变成了可以传我们常用的list,set等实现Collection的集合

[code]public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source)
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)

我们上面的操作都会走到这个统一的方法内执行,这个很少用到,比如

[code]public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}

这个from方法就会把sequences的item转成Observable<? extends Observable<? extends T>>这种格式

concat

[code]public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables)
public static <T> Observable<T> concat(Observable<? extends T>... t)

以传两个参数的为例

[code]public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2) {
return concat(just(t1, t2));
}
public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
return observables.lift(OperatorConcat.<T>instance());
}

public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
...
@Override
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
final SerialSubscription current = new SerialSubscription();
child.add(current);
ConcatSubscriber<T> cs = new ConcatSubscriber<T>(s, current);
ConcatProducer<T> cp = new ConcatProducer<T>(cs);
child.setProducer(cp);
return cs;
}
...
}

最终转化的是cs,所以操作也在cs内,最终会通过cs内的OnNext等方法发送给我们定义的child的注册,我们看下这个类

[code]static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T>> {
private final Subscriber<T> child;
final ConcurrentLinkedQueue<Object> queue;
final AtomicInteger wip = new AtomicInteger();
...

public void onNext(Observable<? extends T> t) {
queue.add(t);
if (wip.getAndIncrement() == 0) {
subscribeNext();
}
}
void subscribeNext() {
...
Object o = queue.poll();
if (o != null) {
Observable<? extends T> obs = o;
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter);
current.set(currentSubscriber);
obs.unsafeSubscribe(currentSubscriber);
...
}
void completeInner() {
currentSubscriber = null;
if (wip.decrementAndGet() > 0) {
subscribeNext();
}
request(1);
}
...
}

static class ConcatInnerSubscriber<T> extends Subscriber<T> {
....
private final Subscriber<T> child;
private final AtomicBoolean once = new AtomicBoolean();

@Override
public void onNext(T t) {
...
child.onNext(t);
}
@Override
public void onCompleted() {
if (once.compareAndSet(false, true)) {
ConcatSubscriber<T> p = parent;
p.produced(produced);
p.completeInner();
}
}
}

我们从上往下看queue是一个线程安全的先进先出的队列,wip是一个线程安全的技术integer

每次OnNext回调都会把相应的Observable放到这个队列里,这个就是最开始传入的多个Observable对象;

然后会调用subscribeNext方法去注册一个分发的subscriber, 同时把wip的值+1;这样后续回调onNext虽然会往queue里添加新的Observable对象,但不会再次进入subscribeNext方法;

然后在subscribeNext方法里创建新的分发对象,并把数据发送给我们最终要回调的哪个subscribe,数据发送完成会会通知上级通过completeInner方法判断是否继续下一个Observable的分发;这样就保证了遍历按照顺序依次执行

[code]Observable observable1 = Observable.just(1, 2).subscribeOn(Schedulers.io()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer o) {
try {
Thread.sleep(1000);
} catch (Exception e) { }
return o;
}
});
Observable observable2 = Observable.just(3, 4).subscribeOn(Schedulers.newThread());
Observable.concat(observable1, observable2).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.e(TAG, "result = " + o + "  " + Thread.currentThread());
}
});

输出结果是

[code]01-29 22:03:34.776 E/Main: result = 1  Thread[RxCachedThreadScheduler-1,5,main]
01-29 22:03:35.779 E/Main: result = 2  Thread[RxCachedThreadScheduler-1,5,main]
01-29 22:03:35.782 E/Main: result = 3  Thread[RxNewThreadScheduler-1,5,main]
01-29 22:03:35.782 E/Main: result = 4  Thread[RxNewThreadScheduler-1,5,main]

 

可以看出虽然是多个线程,但是结果仍然是按照顺序执行

  • 点赞
  • 收藏
  • 分享
  • 文章举报
迷途の知归 发布了20 篇原创文章 · 获赞 1 · 访问量 3009 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: