您的位置:首页 > 编程语言 > Java开发

[RxJava学习]subscribeOn源码分析

2016-08-25 15:48 267 查看
学习文章:给 Android 开发者的 RxJava 详解 之后,我们知道了可以通过subscribeOn方法来指定事件产生的代码在哪里执行。

原话如下:

subscribeOn()
:
指定
subscribe()
 所发生的线程,即 
Observable.OnSubscribe
 被激活时所处的线程。或者叫做事件产生的线程。”

这里摘引《给Android开发者的RxJava详解》中的例子,即:

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
来分析subscribeOn的实现。
还是老办法,我们通过层层的代码替换,来看看示例中的代码执行情况。

第一句的“Observable.just(1, 2, 3, 4)”经过

public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
return from((T[])new Object[] { t1, t2, t3, t4 });
}

public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}
可以等价为:

array = {1, 2, 3, 4};
onSubscribe1 = new OnSubscribeFromArray<T>(array);
observable1.onSubscribe = onSubscribe1;
这样,整个示例就等价于:

Observable.just(1, 2, 3, 4)
array = {1, 2, 3, 4}; onSubscribe1 = new OnSubscribeFromArray<T>(array); observable1.onSubscribe = onSubscribe1;

subscribe1 = new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

observable2 = observable1.subscribeOn(Schedulers.io()); observable2.subscribe(subscribe1);

接下来,我们重点分析下后两句:

observable2 = observable1.subscribeOn(Schedulers.io());
observable2.subscribe(subscribe1);
其中方法subscribe的主要逻辑如下:

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

// new Subscriber so onStart it
subscriber.onStart();

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
}
return Subscriptions.unsubscribed();
}
}
代码替换如下:
observable2 = observable1.subscribeOn(Schedulers.io());
subscribe1.onStart();
(observable2.onSubscribe).call(subscribe1);
方法subscribeOn进过如下替换:

public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
可得到:
observable2 = create(new OperatorSubscribeOn<T>(observable1, Schedulers.io()));
subscribe1.onStart();
(observable2.onSubscribe).call(subscribe1);

继而:

<pre name="code" class="java">onSubscribe2 = new OperatorSubscribeOn<T>(observable1, Schedulers.io());
observable2 = create(onSubscribe2);
subscribe1.onStart();
onSubscribe2.call(subscribe1);

相关源码如下:

//OperatorSubscribeOn
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
代码等价为:
subscriber1.onStart();
final Worker inner = scheduler1.createWorker();
subscriber1.add(inner);
innerAction1 = new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s1 = new Subscriber<T>(subscriber1) {
@Override
public void onNext(T t) {
subscriber1.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber1.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber1.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber1.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s1);
}
};
inner.schedule(innerAction1);
其中scheduler1=Schedulers.io();
代码简化为:

subscriber1.onStart();
final Worker inner = scheduler1.createWorker();
subscriber1.add(inner);
inner.schedule(innerAction1);
接下来就是跟踪
Schedulers.io()的两个方法:createWorker和schedule:

首先看一下Schedulers.io()的值:

Schedulers.io()
==>
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
==>
ioScheduler = createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
==>
ioScheduler = new CachedThreadScheduler(threadFactory);
然后跟踪一下createWorker的返回值:

// CachedThreadScheduler
final AtomicReference<CachedWorkerPool>	pool;
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
其中pool.get()的源码如下:

// CachedWorkerPool
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}

// ThreadWorker
private static final class ThreadWorker extends NewThreadWorker
至此,我们得知:
final Worker inner = scheduler1.createWorker();
执行后,得到了一个EventLoopWorker对象;接下来,我们来看EventLoopWorker的方法:schedule

// EventLoopWorker
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}

ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}

这样的话,代码:

subscriber1.onStart();
final Worker inner = scheduler1.createWorker();
subscriber1.add(inner);
inner.schedule(innerAction1);就等价于:
subscriber1.onStart();

ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
innerAction1.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);

subscriber1.add(s);替换掉innerAction1.call(),代码等价于:
subscriber1.onStart();

ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
observable1.unsafeSubscribe(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);

subscriber1.add(s);替换掉方法unsafeSubscribe,代码等价于:
subscriber1.onStart();

ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);

subscriber1.add(s);
简化如下:

subscriber1.onStart();

<span style="white-space:pre"> </span>scheduledAction0 = new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
};
ScheduledAction s = threadWorker.scheduleActual(scheduledAction0 , delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);

subscriber1.add(s);

接下来,我们来看一下threadWorker.scheduleActual的源码:
// NewThreadWorker
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}其中的executor的值是在构造函数里赋值的,NewThreadWorker的构造函数如下:
// NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
其中:
// Executors
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
即创建了一个核心线程数为1的线程池。
在NewThreadWorker的scheduleActual中,executor.submit就相当于Thread.start;这样,系统就会在后续的某一时刻来执行任务ScheduledAction的run方法,cheduledAction的run方法的源码如下:
// ScheduledAction
final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie = null;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
} finally {
unsubscribe();
}
}
这里的“action.call();”最终指的就是scheduledAction0.call();根据前面的转化可知:
scheduledAction0 = new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
};
由此可知,Observable.subscribeOn()方法是将Observable.onSubscribe.call(subscribe);的执行放在了指定的线程里。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: