您的位置:首页 > 编程语言 > Java开发

RxJava源码深度解析-就这一篇就够了

2017-11-04 23:12 489 查看

RxJava内四个基本概念

Observable 可观察者,即被观察者

Observer 观察者

subscribe 订阅

事件

RxJava1.2.1源码深度解析

示例代码

Observable.just("nihao")
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
Log.d(TAG, "filter=" + s);

return true;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted=");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "Throwable=");

}

@Override
public void onNext(String integer) {
Log.d(TAG, "onNext=" + integer);

}
});


通过Observable调用just()、filter()、subscribeOn()、observeOn()

方法后都会创建一个新的Observable对象。

注意:如果Observable的subscribe(订阅)方法不调用,它以上的代码就不会执行。所以先看subscribe的方法

若想了解RxJava如何线程切换请参考 RxJava线程切换源码分析 文章

subscribe方法

public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}


subscriber:我们在代码中new的对象

this:通过调用observeOn()返回的对象

Observable.subscribe方法

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//判断我们自己创建的Subscriber对象是否为空
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}

if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
//再所有任务开始前,可以做一些初始化操作
subscriber.onStart();

//永远都会进入,把Subscriber对象转换成安全的SafeSubscriber对象
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}

try {
/**
* onObservableStart()方法就是传进去什么再传出来什么
* 调用observable.onSubscribe对象的call方法并把Subscriber对象传进去
*/
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
.....省略.....
}
}


这里observable对象是通过observeOn()创建的,那observable.onSubscribe是什么呢?因为onSubscribe后面会遇到很多,为了区分方便,我们把这个onSubscribe起个别名叫onSubscribe1,我们去看一下?

observeOn方法

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}


scheduler是LooperScheduler对象

RxRingBuffer.SIZE是容量,在JAVA中默认是128,在Android中默认是16

observeOn构造

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}


public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//这里的this是通过调用subscribeOn()创建的Observable对象,是ScalarSynchronousObservable的父类,所以是false不走
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObse
4000
rvable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}


把LooperScheduler对象封装到OperatorObserveOn对象中并调用lift方法

lift方法

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}


operator:OperatorObserveOn对象

注意

onSubscribe是通过调用subscribeOn()方法赋值的。我们把这个onSubscribe起个别名叫onSubscribe2,具体它是什么,先等等?

这里将onSubscribe2对象和operator对象封装到OnSubscribeLift对象中并调用create()方法

create方法

public static <T> Observable<T> create(OnSubscribe<T> f) {
//RxJavaHooks.onCreate(f)方法就是传进去什么再返回什么
return new Observable<T>(RxJavaHooks.onCreate(f));
}


protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}


通过Observable构造函数把OnSubscribeLift赋值给onSubscribe,那上边onSubscribe1就是OnSubscribeLift对象,然后调用OnSubscribeLift对象的call方法并把我们自己创建的Subscriber对象传进去。

OnSubscribeLift的call方法

先看下OnSubscribeLift的构造

public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}


parent:上面提到的别名onSubscribe2

operator:OperatorObserveOn

OnSubscribeLift的call方法

public void call(Subscriber<? super R> o) {
try {
//onObservableLift方法就是传进去什么就返回什么
//这里就是调用OperatorObserveOn的call方法
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
st.onStart();
//这里是调用别名onSubscribe2的call方法,稍后再分析?
parent.call(st);
} catch (Throwable e) {
.....省略.....
}
} catch (Throwable e) {
.....省略.....
}
}


OperatorObserveOn的call方法

先看下OperatorObserveOn的构造

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}


scheduler:LooperScheduler对象

delayError:false

bufferSize:16

OperatorObserveOn的call方法

public Subscriber<? super T> call(Subscriber<? super T> child) {
//
if (scheduler instanceof ImmediateScheduler) {
return child;
} else if (scheduler instanceof TrampolineScheduler) {
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}


scheduler:LooperScheduler对象

child:我们自己创建的Subscriber对象,经过封装现在是SafeSubscriber对象

因为scheduler是LooperScheduler对象,所以会走else即创建ObserveOnSubscriber对象

ObserveOnSubscriber的构造函数

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
//child是SafeSubscriber对象
this.child = child;
//调用了LooperScheduler的createWorker()方法返回HandlerWorker对象
this.recursiveScheduler = scheduler.createWorker();
//delayError=false
this.delayError = delayError;
//容量大小calculatedSize=16
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
//当容量达到75%时,对队列进行扩容
this.limit = calculatedSize - (calculatedSize >> 2);
//存放任务的队列
if (UnsafeAccess.isUnsafeAvailable()) {
//进入
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
//初始化,开始接受异步操作
request(calculatedSize);
}


request方法

protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
Producer producerToRequestFrom;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
//走进来
addToRequested(n);
//返回
return;
}
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}


private void addToRequested(long n) {
if (requested == NOT_SET) {//进来
//对requested赋值16
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}


ObserveOnSubscriber.init()方法

oid init() {
//child是SafeSubscriber对象
Subscriber<? super T> localChild = child;
//调用SafeSubscriber的setProducer方法,因为没有所以调用Subscriber的
//调用完成后,肯定也会回调Producer的request方法
localChild.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}


Subscriber的setProducer方法

public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
//这里requested是16
toRequest = requested;
producer = p;
if (subscriber != null) {
//不会进去
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
//释放锁后执行
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
//进入到这里,回调回去
producer.request(toRequest);
}
}
}


通过producer.request(toRequest)回调到ObserveOnSubscriber的init方法内(toRequest=16),调用schedule()方法。

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}


recursiveScheduler:LooperScheduler内的HandlerWorker对象

this:ObserveOnSubscriber对象

public Subscription schedule(final Action0 action) {

return schedule(action, 0, TimeUnit.MILLISECONDS);

}

public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this;
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}


class ScheduledAction implements Runnable, Subscription {
public void run() {
action.call();
}
}


这里就是把ObserveOnSubscriber对象封装到ScheduledAction任务中,通过Handler发送到主线程执行,并回调ObserveOnSubscriber的call方法

ObserveOnSubscriber的call方法
148f1

public void call() {
long missed = 1L;
long currentEmission = emitted;
//消息队列SpscArrayQueue
final Queue<Object> q = this.queue;
//child是SafeSubscriber
final Subscriber<? super T> localChild = this.child;
for (;;) {//开始for循环遍历消息队列内的消息
long requestAmount = requested.get();

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();//获取队列中的消息
boolean empty = v == null;
//检查是否需要结束
if (checkTerminated(done, empty, localChild, q)) {
return;
}
//从队列中取到的消息是否为空
//第一次启动的时候是空的直接返回
//当调用了just等一些方法后,再次调用call方法时,这是队列中已经有消息了就不为空了
if (empty) {
break;
}
回调到我们写的Subscriber对象中
localChild.onNext(NotificationLite.<T>getValue(v));
.....省略.....
}
.....省略.....
}
}


OnSubscribeLift的call方法中的parent.call(st)方法

public void call(Subscriber<? super R> o) {
try {
//onObservableLift方法就是传进去什么就返回什么
//这里就是调用OperatorObserveOn的call方法
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
st.onStart();
//这里是调用别名onSubscribe2的call方法,稍后再分析?
parent.call(st);
} catch (Throwable e) {
.....省略.....
}
} catch (Throwable e) {
.....省略.....
}
}


st:ObserveOnSubscriber对象

parent:上面提到的别名onSubscribe2,而onSubscribe2是通过subscribeOn()来赋值的,然后调用了onSubscribe2的call方法并把ObserveOnSubscriber对象传进去

subscribeOn()方法

public final Observable<T> subscribeOn(Scheduler scheduler) {
//scheduler是CachedThreadScheduler,不会进入
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}


this:通过调用filter方法返回的Observable对象

scheduler:CachedThreadScheduler对象

这里将this和scheduler封装到OperatorSubscribeOn对象中

再往下看:

public static <T> Observable<T> create(OnSubscribe<T> f) {
//RxJavaHooks.onCreate(f)传进什么就返回什么
//这里f是OperatorSubscribeOn对象
return new Observable<T>(RxJavaHooks.onCreate(f));
}


protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}


可以看到上边我们提到的onSubscribe2就是OperatorSubscribeOn对象

OperatorSubscribeOn对象

先看下OperatorSubscribeOn的构造方法

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}


scheduler:CachedThreadScheduler对象

source:通过调用filter方法返回的Observable对象

OperatorSubscribeOn的call方法

public void call(final Subscriber<? super T> subscriber) {
//返回CachedThreadScheduler对象中EventLoopWorker对象
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//调用CachedThreadScheduler对象中的schedule方法,肯定也会回调call方法,这里起别名为call1
inner.schedule(new Action0() {
@Override
public void call() {
.....省略.....
}
});
}


scheduler:CachedThreadScheduler对象

inner:EventLoopWorker对象

调用EventLoopWorker对象中的schedule方法,肯定也会回调call方法,这里起别名为call1

CachedThreadScheduler的createWorker方法

public Worker createWorker() {
return new EventLoopWorker(pool.get());
}


pool:CachedWorkerPool对象

CachedWorkerPool.get()方法

ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
//缓存任务的队列不为空时就会从队列中取出一个ThreadWorker对象
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

//如果缓存中没有就创建一个
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}


//将ThreadWorker方法缓存队列中
void release(ThreadWorker threadWorker) {
threadWorker.setExpirationTime(now() + keepAliveTime);
//将ThreadWorker方法缓存队列中
expiringWorkerQueue.offer(threadWorker);
}


ThreadWorker对象是什么

class ThreadWorker extends NewThreadWorker {
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);//super很重要
this.expirationTime = 0L;
}
}


public NewThreadWorker(ThreadFactory threadFactory) {
//创建了一个定时执行任务的ScheduledThreadPool线程池,核心线程数是1
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}


也就是说ConcurrentLinkedQueue队列中存放的是没有过期的ThreadWorker对象,而ThreadWorker对象内有创建了定时执行任务的ScheduledThreadPool线程池,用于将需要执行的任务在非主线程中去执行,起到了线程切换的作用。

接着往下看

CachedThreadScheduler的schedule()方法

public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}


public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
.....省略.....
//调用ThreadWorker的scheduleActual方法,肯定也会回调call方法,这里起别名为call2
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
//这里回调call1方法
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}


在执行threadWorker.scheduleActual()方法前所有操作都是在主线程进行的,这个方法后所有的回调方法都是在非主线程中调用的。这里线程的切换都是ThreadWorker的功劳。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
//把action封装到ScheduledAction任务中
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
//delayTime=0,unit为空
if (delayTime <= 0) {//进入if
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}


ScheduledAction类

public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new SubscriptionList();
}


action:上面创建的对象,用于回调call方法

当执行了executor.submit(run)这个任务将调用ScheduledAction的run方法

public void run() {
//回调call方法即别名call2方法
action.call();
}


run方法执行后会回调别名为call2的方法,在call方法中会回调别名为call1的方法

再粘贴下别名call1的方法

inner.schedule(new Action0() {
@Override
public void call() {
.....省略.....
//上面的方法都是需要被回调的
//source是通过调用filter方法返回的Observable对象并把这里创建的Subscriber对象传递进去
source.unsafeSubscribe(s);
}
});


Observable的unsafeSubscribe方法

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
//没什么用
subscriber.onStart();
// allow the hook to intercept and/or decorate
//这里就是调用onSubscribe对象的call方法并把subscriber对象传进去
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
.....省略.....
}
}


this:通过调用filter方法返回的Observable对象

onSubscribe:通过调用filter方法进行赋值的,这里给这个onSubscribe起别名为onSubscribe3。它是什么呢,稍等?

filter()方法

传进去一个Func1对象并实现call方法

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
return create(new OnSubscribeFilter<T>(this, predicate));
}


this:通过调用just()方法创建的对象

predicate:我们通过代码创建的Func1对象

把this和predicate封装到OnSubscribeFilter对象中,再看create方法

public static <T> Observable<T> create(OnSubscribe<T> f) {
//传进去什么就返回什么
return new Observable<T>(RxJavaHooks.onCreate(f));
}


protected Observable(OnSubscribe<T> f) {
//就是把OnSubscribeFilter赋值给onSubscribe
this.onSubscribe = f;
}


上面提到的别名onSubscribe3就是OnSubscribeFilter对象,并调用了它的call方法

OnSubscribeFilter对象

先看下构造函数

public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
this.source = source;
this.predicate = predicate;
}


source:通过调用just()方法创建的Observable对象

predicate:我们通过代码创建的Func1对象

OnSubscribeFilter的call方法

public void call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
source.unsafeSubscribe(parent);
}


child:OperatorSubscribeOn对象中的call方法内通过inner.schedule(…)创建的Subscriber对象

source:通过调用just()方法创建的Observable对象

predicate:我们通过代码创建的Func1对象

FilterSubscriber对象

先看一下构造函数

public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
this.actual = actual;
this.predicate = predicate;
request(0);
}


actual:OperatorSubscribeOn对象中的call方法内通过inner.schedule(…)创建的Subscriber对象

predicate:我们通过代码创建的Func1对象

FilterSubscriber构造中的request(0)方法作用就是默认设置requested值是16,这次请求把requested值设置为0

OnSubscribeFilter的call方法内source.unsafeSubscribe(parent)

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
.....省略.....
}
}


onSubscribe:通过调用just方法进行赋值的,这里给这个onSubscribe起别名为onSubscribe4。它是什么呢,稍等请往下看?

subscriber:FilterSubscriber对象,在FilterSubscriber对象内封装了通过OperatorSubscribeOn对象中的call方法内调用inner.schedule(…)创建的Subscriber对象和我们通过代码创建的Func1对象

在这里就是调用了别名为onSubscribe4的call方法

Observable.just(“nihao”)方法

public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}


public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}


protected ScalarSynchronousObservable(final T t) {
//onCreate方法是传进入什么就返回什么
super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
this.t = t;
}


super(RxJavaHooks.onCreate(new JustOnSubscribe(t)))方法

protected Observable(OnSubscribe<T> f) {
//赋值给onSubscribe
this.onSubscribe = f;
}


just方法到此就是创建了ScalarSynchronousObservable对象并把JustOnSubscribe对象赋值给onSubscribe,所以上面提到的别名onSubscribe4就是这里的JustOnSubscribe对象。

JustOnSubscribe对象

先看下构造函数

JustOnSubscribe(T value) {
this.value = value;
}


把我们自己写的值传进去

JustOnSubscribe的call方法

public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}


s:OnSubscribeFilter对象中的FilterSubscriber对象

value:我们自己传入的值

createProducer(s, value)方法

static <T> Producer createProducer(Subscriber<? super T> s, T v) {
if (STRONG_MODE) {
return new SingleProducer<T>(s, v);
}
return new WeakSingleProducer<T>(s, v);
}


s:OnSubscribeFilter对象中的FilterSubscriber对象

v:我们自己传入的值

创建WeakSingleProducer对象

WeakSingleProducer对象

public WeakSingleProducer(Subscriber<? super T> actual, T value) {
this.actual = actual;
this.value = value;
}


actual:OnSubscribeFilter对象中的FilterSubscriber对象

value:我们自己传入的值

JustOnSubscribe的call方法内s.setProducer(createProducer(s, value))这个方法是调用OnSubscribeFilter类中的FilterSubscriber对象的setProducer方法,并把WeakSingleProducer对象传进入。

FilterSubscriber对象的setProducer方法

public void setProducer(Producer p) {
super.setProducer(p);
actual.setProducer(p);
}


p:WeakSingleProducer对象

actual:FilterSubscriber对象,在FilterSubscriber对象内封装了通过OperatorSubscribeOn对象中的call方法内调用inner.schedule(…)创建的Subscriber对象

super.setProducer(p)方法:就是调用producer.request(toRequest),在FilterSubscriber的构造方法中请求了request(0)方法,所以toRequest此时是0

WeakSingleProducer的request方法

public void request(long n) {//此时n是0
if (once) {
return;
}
if (n < 0L) {
throw new IllegalStateException("n >= required but it was " + n);
}
if (n == 0L) {//当这里直接返回了
return;
}
.....省略.....
}


actual.setProducer(p)方法

这里actual是FilterSubscriber对象,在FilterSubscriber对象内封装了通过OperatorSubscribeOn对象中的call方法内调用inner.schedule(…)创建的Subscriber对象

Subscriber<T> s = new Subscriber<T>(subscriber) {
.....省略.....

@Override
public void setProducer(final Producer p) {//调用此方法
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {//肯定会回调request方法
//判断是那个线程
if (t == Thread.currentThread()) {//进入
p.request(n);//在ObserveOnSubscriber中赋值为16
} else {
//再次切换线程
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};


subscriber:ObserveOnSubscriber对象

p:WeakSingleProducer对象

inner:CachedThreadScheduler对象中EventLoopWorker对象

p.request(n)方法:是再次回调到WeakSingleProducer对象的request方法

public void request(long n) {//此时n为16
if (once) {
return;
}
if (n < 0L) {
throw new IllegalStateException("n >= required but it was " + n);
}
if (n == 0L) {
return;
}
once = true;
Subscriber<? super T> a = actual;//是FilterSubscriber对象
if (a.isUnsubscribed()) {
return;
}
T v = value;
try {
a.onNext(v);//调用FilterSubscriber的onNext方法并把值传进去
} catch (Throwable e) {
Exceptions.throwOrReport(e, a, v);
return;
}

if (a.isUnsubscribed()) {
return;
}
a.onCompleted();//调用FilterSubscriber的onCompleted方法结束
}


从这里开始,正式处理我们传入的数据

通过a.onNext(v)方法,调用FilterSubscriber的onNext方法并把值传进去

通过a.onCompleted()方法,调用FilterSubscriber的onCompleted方法结束, 此时事件发送到FilterSubscriber对象onNext(v)中

FilterSubscriber对象onNext(v)

public void onNext(T t) {
boolean result;

try {
//调用我们自己实现的call方法并返回boolean值
result = predicate.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
//异常直接报错返回
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
if (result) {//返回true进入
actual.onNext(t);
} else {
//如果返回false则只是把requested赋值为1,调用链停止,直接走onCompleted方法
request(1);
}
}


public void onCompleted() {
if (done) {//如果是错误的就不走了
return;
}
//继续调用上层的onCompleted方法
actual.onCompleted();
}


predicate:我们通过代码创建的Func1对象

actual:OperatorSubscribeOn对象中的call方法内通过inner.schedule(…)创建的Subscriber对象

这里通过predicate.call(t)把值传给我们创建的Func1对象,我们自己处理后返回一个boolean值

如果返回true则继续调用下一个对象的onNext(t)

如果返回false则调用链断开,直接执行onCompleted方法

actual.onNext(t)方法

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
//继续调用上层的onNext方法
subscriber.onNext(t);
}

@Override
public void onCompleted() {
try {
//继续调用上层的onCompleted方法
subscriber.onCompleted();
} finally {
//取消订阅
inner.unsubscribe();
}
}
.....省略.....
};


subscriber:ObserveOnSubscriber对象

这里继续调用ObserveOnSubscriber的onNext(t)方法并把值传递进入,通过onCompleted()继续调用上层的onCompleted方法

ObserveOnSubscriber的onNext方法

public void onNext(final T t) {
.....省略.....
//插入到队列
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}


protected void schedule() {
if (counter.getAndIncrement() == 0) {
//recursiveScheduler是HandlerWorker对象
recursiveScheduler.schedule(this);
}
}


HandlerWorker的schedule方法

public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
.....省略.....
return scheduledAction;
}


这里就是把我们传入的值封装到ObserveOnSubscriber对象中并放入SpscArrayQueue队列中,ObserveOnSubscriber对象又封装到ScheduledAction任务中,再把ScheduledAction任务通过Handler发送到主线程去执行,当主线程一执行就会回调到ObserveOnSubscriber的call方法,在call方法中又会从队列中拿到我们传进去的值,并回调到我们创建的Subscriber对象的onNext(t)方法中,在onNext(t)方法执行完后,就会执行onCompleted方法。

如果在filter中返回false则onNext(t)方法不会执行,直接执行onCompleted方法

如果在filter、map中出错则只会执行onError方法

总结

由链式调用顺序可知当调用just、filter、subscribeOn、observeOn方法时,各自分别又创建了Observable对象,但只有调用了subscribe(订阅)方法,整个流程才能运行,运行的顺序是先调用observeOn、subscribeOn、filter、just内的方法,然后再从just、filter、subscribeOn、observeOn内开始回调回来,只到回调到我们自己创建的Subscriber对象的三个方法内。

subscribe(Subscriber subscriber):把我们自己创建的Subscriber传递进去

observeOn(AndroidSchedulers.mainThread()):把我们自己创建的Subscriber对象封装到任务中,通过Handler发送到主线程,使Subscriber对象内的三个方法都能在主线程运行

subscribeOn(Schedulers.io()):在内部创建了定时执行任务的ScheduledThreadPool线程池,用于将subscribeOn以上的所有方法都执行在非主线程内

filter(Func1 func1):执行在非主线程的过滤器,可根据传进来的值判断是否继续执行

just(Object o):是在非主线程传进去值

注意

如果多次调用 observeOn() ,程序会多次切换线程,但subscribeOn()只能调用一次,当多次调用subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息