RxJava(11-线程调度Scheduler)
2016-07-04 16:39
483 查看
转载请标明出处:
http://blog.csdn.net/xmxkf/article/details/51821940
本文出自:【openXu的博客】
目录:
使用示例
subscribeOn原理
多次subscribeOn的情况
observeOn原理
调度器的种类
各种操作符的默认调度器
源码下载
RxJava中 使用
从上面的输出结果中,我们大概知道了下面几点:
①. RxJava中已经封装了多种调度器,不同的调度器可以指定在不同的线程中执行和观察
②. create创建的Observable默认在当前线程(主线程)中执行任务流,并在当前线程观察
③. interval创建的Observable会在一个叫Computation的线程中执行任务流和观察任务流
④. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换
下面看看
我们看到他创建了一个新的
上面源码中注释已经写的很清楚了,
在
我们发现
通过上面的分析,我们知道
多次
我们看看
可以发现,observeOn操作符对它后面的操作产生影响,比如下面一段代码:
只要涉及到
在RxAndroid中新增了一个:
http://blog.csdn.net/xmxkf/article/details/51821940
本文出自:【openXu的博客】
目录:
使用示例
subscribeOn原理
多次subscribeOn的情况
observeOn原理
调度器的种类
各种操作符的默认调度器
源码下载
RxJava中 使用
observeOn和
subscribeOn操作符,你可以让
Observable在一个特定的调度器上执行,
observeOn指示一个
Observable在一个特定的调度器上调用观察者的
onNext,
onError和
onCompleted方法,
subscribeOn则指示
Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。
1. 使用示例
先看看下面的例子,体验一下在RxJava中 如何使用线程的切换:private void logThread(Object obj, Thread thread){ Log.v(TAG, "onNext:"+obj+" -"+Thread.currentThread().getName()); } Observable.OnSubscribe onSub = new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { Log.v(TAG, "OnSubscribe -"+Thread.currentThread()); subscriber.onNext(1); subscriber.onCompleted(); } }; Log.v(TAG, "--------------①-------------"); Observable.create(onSub) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------②-------------"); Observable.create(onSub) .subscribeOn(Schedulers.io()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------③-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------④-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------⑤-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------⑥-------------"); Observable.interval(100, TimeUnit.MILLISECONDS) .take(1) .subscribe(integer->logThread(integer, Thread.currentThread())); /* 输出: --------------①------------- OnSubscribe -Thread[main,5,main] onNext:1 -Thread[main,5,main] --------------②------------- OnSubscribe -Thread[RxIoScheduler-2,5,main] onNext:1 -Thread[RxIoScheduler-2,5,main] --------------③------------- OnSubscribe -Thread[RxNewThreadScheduler-1,5,main] onNext:1 -Thread[RxNewThreadScheduler-1,5,main] --------------④------------- OnSubscribe -Thread[RxNewThreadScheduler-2,5,main] onNext:1 -Thread[main,5,main] --------------⑤------------- OnSubscribe -Thread[RxNewThreadScheduler-4,5,main] onNext:1 -Thread[RxNewThreadScheduler-3,5,main] --------------⑥------------- onNext:0 -RxComputationScheduler-3 */
从上面的输出结果中,我们大概知道了下面几点:
①. RxJava中已经封装了多种调度器,不同的调度器可以指定在不同的线程中执行和观察
②. create创建的Observable默认在当前线程(主线程)中执行任务流,并在当前线程观察
③. interval创建的Observable会在一个叫Computation的线程中执行任务流和观察任务流
④. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换
2. subscribeOn()原理
subscribeOn()用来指定
Observable在哪个线程中执行事件流,也就是指定
Observable中
OnSubscribe(计划表)的
call方法在那个线程发射数据。下面通过源码分析
subscribeOn是怎样实现线程的切换的。
下面看看
subscribeOn方法:
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
我们看到他创建了一个新的
Observable,并为新的
Observable创建了新的计划表
OperatorSubscribeOn对象,新的计划表保存了原始
Observable对象和调度器
scheduler。接着我们看看
OperatorSubscribeOn:
public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> { final Scheduler scheduler; //调度器 final Observable<T> source; //原始Observable //①.原始观察者订阅了新的Observable后,将执行此call方法 @Override public void call(final Subscriber<? super T> subscriber) { final Scheduler.Worker inner = scheduler.createWorker(); subscriber.add(inner); //②. call方法中使用传入的调度器创建的Worker对象的schedule方法切换线程 inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); //③ .创建了一个新的观察者 Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { //⑤. 新的观察者收到数据后直接发送给原始观察者 subscriber.onNext(t); } ... }; //④. 在切换的线程中,新的观察者订阅原始Observable,用来接收数据 source.unsafeSubscribe(s); } }); } }
上面源码中注释已经写的很清楚了,
OperatorSubscribeOn其实就是一个普通的任务表,用于为新的
Observable发射数据,只是不是真正的发射,它创建了一个新的观察者订阅原始
Observable,这样就可以接受原始
Observable发射的数据,然后直接发送给原始观察者。
在
call方法中通过
scheduler.createWorker().schedule()完成线程的切换,这里就牵扯到两个对象了,
Scheduler和
Worker,不要着急,一个个的看,先看
Scheduler,由于RxJava中有多种调度器,我们就看一个简单的
Schedulers.newThread(),其他调度器的思路是一样的,下面一步一步看源码:
public final class Schedulers { //各种调度器对象 private final Scheduler computationScheduler; private final Scheduler ioScheduler; private final Scheduler newThreadScheduler; //单例,Schedulers被加载的时候,上面的各种调度器对象已经初始化 private static final Schedulers INSTANCE = new Schedulers(); //构造方法 private Schedulers() { RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); ... Scheduler nt = hook.getNewThreadScheduler(); if (nt != null) { newThreadScheduler = nt; } else { //①.创建newThreadScheduler对象 newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler(); } } //②. 获取NewThreadScheduler对象 public static Scheduler newThread() { return INSTANCE.newThreadScheduler; } ... }
Schedulers中保存了多种调度器对象,在
Schedulers被加载的时候,他们就被初始化了,
Schedulers就像是一个调度器的管理器,接着跟踪
RxJavaSchedulersHook.createNewScheduler(),最终会找到一个叫
NewThreadScheduler的类:
public final class NewThreadScheduler extends Scheduler { private final ThreadFactory threadFactory; public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); } }
NewThreadScheduler就是我们调用
subscribeOn(Schedulers.newThread() )传入的调度器对象,每个调度器对象都有一个
createWorker方法用于创建一个
Worker对象,而
NewThreadScheduler对应创建的
Worker是一个叫
NewThreadWorker的对象,在新产生的
OperatorSubscribeOn计划表中就是通过
NewThreadWorker.schedule(Action0)实现线程的切换,下面我们跟踪
schedule(Action0)方法:
public class NewThreadWorker extends Scheduler.Worker implements Subscription { private final ScheduledExecutorService executor; // public NewThreadWorker(ThreadFactory threadFactory) { //创建一个线程池 ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); executor = exec; } @Override public Subscription schedule(final Action0 action) { return schedule(action, 0, null); } @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { return scheduleActual(action, delayTime, unit); } //重要:worker.schedule()最终调用的是这个方法 public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { //return action; Action0 decoratedAction = schedulersHook.onSchedule(action); //ScheduledAction就是一个Runnable对象,在run()方法中调用了Action0.call() ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); //将Runnable对象放入线程池中 } else { f = executor.schedule(run, delayTime, unit); //延迟执行 } run.add(f); return run; } ... }
我们发现
OperatorSubscribeOn计划表中通过
NewThreadWorker.schedule(Action0),将
Action0放入到一个线程池中执行,这样就实现了线程的切换。
通过上面的分析,我们知道
subscribeOn是怎样将任务表放入线程池中执行的,感觉还是有点绕,看下图:
多次subscribeOn()的情况
我们发现,每次使用subscribeOn都会产生一个新的
Observable,并产生一个新的计划表
OnSubscribe,目标Subscriber最后订阅的将是最后一次
subscribeOn产生的新的
Observable。在每个新的
OnSubscribe的
call方法中都会有一个产生一个新的线程,在这个新线程中订阅上一级
Observable,并创建一个新的
Subscriber接受数据,最终原始
Observable将在第一个新线程中发射数据,然后传送给给下一个新的观察者,直到传送到目标观察者,所以多次调用
subscribeOn只有第一个起作用(这只是表面现象,其实每个
subscribeOn都切换了线程,只是最终目标
Observable是在第一个
subscribeOn产生的线程中发射数据的)。看下图:
多次
subscribeOn()只有第一个会起作用,后面的只是在第一个的基础上在外面套了一层壳,就像下面的伪代码,最后执行是在第一个新线程中执行:
... //第3个subscribeOn产生的新线程 new Thread(){ @Override public void run() { Subscriber s1 = new Subscriber(); //第2个subscribeOn产生的新线程 new Thread(){ @Override public void run() { Subscriber s2 = new Subscriber(); //第1个subscribeOn产生的新线程 new Thread(){ @Override public void run() { Subscriber<T> s3 = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } ... }; //①. 最后一个新观察者订阅原始Observable 原始Observable.subscribe(s3); //②. 原始Observable将在此线程中发射数据 //③. 最后一个新的观察者s3接受数据 //④. s3收到数据后,直接发送给s2,s2收到数据后传给s1,...最后目标观察者收到数据 } }.start(); } }.start(); } }.start();
3. observeOn原理
observeOn调用的是
lift操作符,
lift操作符在上一篇博客中讲过。
lift操作符创建了一个代理的
Observable,用于接收原始
Observable发射的数据,然后在
Operator中对数据做一些处理后传递给目标
Subscriber。
observeOn一样创建了一个代理的
Observable,并创建一个代理观察者接受上一级
Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的
onNext、
onCompete、
onError方法。
我们看看
observeOn操作符的源码:
public final class OperatorObserveOn<T> implements Observable.Operator<T, T> { private final Scheduler scheduler; //创建代理观察者,用于接收上一级Observable发射的数据 @Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { return child; } else if (scheduler instanceof TrampolineScheduler) { return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } } //代理观察者 private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite<T> on; final Queue<Object> queue; //接受上一级Observable发射的数据 @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { ... schedule(); } @Override public void onError(final Throwable e) { ... schedule(); } //开启新线程处理数据 protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } // only execute this from schedule() //在新线程中将数据发送给目标观察者 @Override public void call() { long missed = 1L; long currentEmission = emitted; final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; for (;;) { while (requestAmount != currentEmission) { ... localChild.onNext(localOn.getValue(v)); } } } } }
可以发现,observeOn操作符对它后面的操作产生影响,比如下面一段代码:
Observable.just(100) .subscribeOn(Schedulers.computation()) //Computation线程中发射数据 .map(integer -> {return "map1-"+integer;}) //Computation线程中接受数据 .observeOn(Schedulers.io()) //②. 切换 .map(integer -> {return "map2-"+integer;}) //io线程中接受数据,由②决定 .observeOn(Schedulers.newThread()) //③. 切换 .map(integer -> {return "map3-"+integer;}) //newThread线程中接受数据,由③决定 .observeOn(AndroidSchedulers.mainThread()) //④. 切换 .delay(1000, TimeUnit.MILLISECONDS) //主线程中接受数据,由④决定 .subscribe(str -> logThread(str, Thread.currentThread())); //Computation线程中接受数据,由④决定 /* 说明:最后目标观察者将在Computation线程中接受数据,这取决于delay操作符, delay操作符是在Computation线程中执行的,执行完后就会将数据发送给目标观察者。 而他上面的observeOn将决定于delay产生的代理观察者在主线程中接受数据 */ /* 输出: onNext:map3-map2-map1-100 -RxComputationScheduler-3 */
只要涉及到
lift操作符,其实就是生成了一套代理的
Subscriber(观察者)、
Observable(被观察者)和
OnSubscribe(计划表)。
Observable最典型的特征就是链式调用,我们暂且将每一步操作称为一级。代理的
OnSubscribe中的
call方法就是让代理
Subscriber订阅上一级
Observable,直到订阅到原始
Observable发射数据,代理
Subscriber收到数据后,可能对数据做一些操作也有可能切换线程,然后将数据传送给下一级
Subscriber,直到目标观察者接收到数据,目标观察者在那个线程接受数据取决于上一个
Subscriber在哪一个线程调用目标观察者的方法。示意图如下:
4. 调度器的种类
RxJava中可用的调度器有下面几种:调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
调度器类型 | 效果 |
---|---|
AndroidSchedulers.mainThread( ) | 主线程,UI线程,可以用于更新界面 |
5. 各种操作符的默认调度器
在之前学习各种操作符的时候,都会介绍xx操作符默认在xxx调度器上执行,当时可能不太注意这是什么意思,下面总结了一些操作符默认的调度器:操作符 | 调度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retrytrampolinesample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
源码下载:
https://github.com/openXu/RxJavaTest相关文章推荐
- RxJava(11-线程调度Scheduler)
- Eclipse变量插件问题
- Eclipse Class Decompiler——Java反编译插件
- java动态代理(JDK和cglib)
- Java @override报错的解决方法
- 善用Eclipse的代码模板功能
- Java使用MyEclipse构建webService简单案例
- iReport3.7.1 java打印报表(二)
- spring的第一天
- Eclipse中java文件头注释格式设置
- [改善Java代码]枚举和注解结合使用威力更大
- Eclipse Java注释模板设置详解
- Ubuntu下通过ppa安装jdk
- Spring MVC 系统异常处理方式及性能对比
- 高效编写Java代码的几条建议
- Eclipse中Java开发中版权声明及编码风格的注释
- Struts2的OGNL标签详解
- Eclipse常用技巧总结:热键,自定义模版及其他
- java中的CAS
- Java模拟Windows的Event