RxJava 教程第四部分:并发 之线程调度
2016-12-23 15:37
387 查看
由于 Rx 目标是用在异步系统上并且 Rx 支持多线程处理,所以很多 Rx 开发者认为默认情况下 Rx 就是多线程的。 其实实际情况不是这样的,
Rx 默认是单线程的。
除非你明确的指定线程,否则所有 onNext/onError/onCompleted 以及各个操作函数的调用都是在同一个线程中完成的。例如下面的示例:
结果:
上面在三个线程中分别调用 subject 的onNext 函数。和 Runnable 中的线程是同一个线程。不管用多少个操作函数串联调用,结果都是同一个线程。
在Rx 中你并不直接和 线程 打交道,而是通过 Scheduler 来处理多线程。
结果:
可以看到上面的代码是在同一个线程中执行,并且是按循序执行的。subscribe 执行完后(包括create 函数里面的 Lambda 表达式的代码)才继续执行后面的代码。
如果你把上面的注释掉的代码 .subscribeOn(Schedulers.newThread()) 启用,这结果是这样的:
这样 create 里面的 Lambda 表达式代码将会在 Schedulers.newThread() 返回的线程中执行。subscribe 不再是阻塞的了。后面的代码可以立即执行,而不用等待 subscribe 返回。
有些 Observable 内部会使用它们自己创建的线程。例如 Observable.interval 就是异步的。这种情况下,无需指定新的线程。
结果:
结果:
observeOn 只影响调用该函数以后的操作函数。你可以认为 observeOn 只是拦截了数据流并且对后续的操作有作用。例如:
结果:
可以看到在遇到 observeOn 之前,所有的操作发生在一个线程,之后在另外一个线程。这样可以在 Rx 数据流中不同地方设置不同的线程。
如果你知道数据流处理在那些情况需要很长时间,则可以通过这个操作来避免阻塞生产者线程。 比如在 Android 开发过程中的 UI 线程,如果在该线程中读取文件,可能会导致 UI 卡死(ANR)无响应,通过该函数可以指定读取文件在另外一个线程中执行。
结果:
Scheduler对象。Scheduler 是用来协调任务执行的。 RxJava 包含了一些常用的 Scheduler,你也可以自定义 Scheduler。 通过调用
Schedulers的工厂函数来获取标准的预定义的 Scheduler。
RxJava 内置的 Scheduler 有:
– immediate 同步执行
– trampoline 把任务放到当前线程的队列中,等当前任务执行完了,再继续执行队列中的任务
– newThread 对于每个任务创建一个新的线程去执行
– computation 计算线程,用于需要大量 CPU 计算的任务
– io 用于执行 io 操作的任务
– test 用于测试和调试
当前 computation 和 io 的实现是类似的,他们两个主要用来确保调用的场景,相当于文档说明,来表明你当前的任务是何种类型的。
大部分的 Rx 操作函数内部都使用了schedulers 。并且大部分的 Observable 操作函数也都有一个使用 Scheduler 参数的重载函数。通过重载函数可以指定该操作函数执行的线程。
Scheduler.Worker)。然后可以调度该任务:
上面的任务被分配到其指定的线程中了。
还可以重复执行任务,或者只执行一次,也可以推迟任务执行:
结果:
上面示例中可以看到,推迟执行是从调度开始的时候计算时间的。
结果:
第一个任务中调用了 unsubscribe,这样第二个任务被取消了。下面一个示例演示任务没有执行完,被取消的情况,会抛出一个 InterruptedException 异常:
结果:
schedule 返回的是一个 Subscription 对象,可以在该对象上调用取消操作,这样可以只取消这一个任务,而不是取消所有任务。
结果:
结果:
TrampolineScheduler 把任务安排到第一次执行任务的那个线程中执行。这样,第一次调用 schedule 的操作是阻塞的,直到队列执行完。后续的任务,会在这个线程中一个一个的执行,并且后续的调用不会阻塞。
定义一个打印线程信息的辅助函数:
示例:
结果:
Rx 默认是单线程的。
除非你明确的指定线程,否则所有 onNext/onError/onCompleted 以及各个操作函数的调用都是在同一个线程中完成的。例如下面的示例:
final BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.subscribe(i -> { System.out.println("Received " + i + " on " + Thread.currentThread().getId()); }); int[] i = {1}; // naughty side-effects for examples only ;) Runnable r = () -> { synchronized(i) { System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId()); subject.onNext(i[0]++); } }; r.run(); // Execute on main thread new Thread(r).start(); new Thread(r).start();
结果:
onNext(1) on 1 Received 1 on 1 onNext(2) on 11 Received 2 on 11 onNext(3) on 12 Received 3 on 12
上面在三个线程中分别调用 subject 的onNext 函数。和 Runnable 中的线程是同一个线程。不管用多少个操作函数串联调用,结果都是同一个线程。
subscribeOn 和 observeOn
subscribeOn 和 observeOn 分别用来控制 subscription 的调用线程和 接受事件通知(Observer 的 onNext/onError/onCompleted 函数)的线程。public final Observable<T> observeOn(Schedulerscheduler) public final Observable<T> subscribeOn(Schedulerscheduler)
在Rx 中你并不直接和 线程 打交道,而是通过 Scheduler 来处理多线程。
subscribeOn
subscribeOn 用来指定 Observable.create 中的代码在那个 Scheduler 中执行。即使你没有调用 create 函数,但是内部也有一个 create 实现。例如:System.out.println("Main: " + Thread.currentThread().getId()); Observable.create(o -> { System.out.println("Created on " + Thread.currentThread().getId()); o.onNext(1); o.onNext(2); o.onCompleted(); }) //.subscribeOn(Schedulers.newThread()) .subscribe(i -> { System.out.println("Received " + i + " on " + Thread.currentThread().getId()); }); System.out.println("Finished main: " + Thread.currentThread().getId());
结果:
Main: 1 Createdon 1 Received 1 on 1 Received 2 on 1 Finishedmain: 1
可以看到上面的代码是在同一个线程中执行,并且是按循序执行的。subscribe 执行完后(包括create 函数里面的 Lambda 表达式的代码)才继续执行后面的代码。
如果你把上面的注释掉的代码 .subscribeOn(Schedulers.newThread()) 启用,这结果是这样的:
Main: 1 Finishedmain: 1 Createdon 11 Received 1 on 11 Received 2 on 11
这样 create 里面的 Lambda 表达式代码将会在 Schedulers.newThread() 返回的线程中执行。subscribe 不再是阻塞的了。后面的代码可以立即执行,而不用等待 subscribe 返回。
有些 Observable 内部会使用它们自己创建的线程。例如 Observable.interval 就是异步的。这种情况下,无需指定新的线程。
System.out.println("Main: " + Thread.currentThread().getId()); Observable.interval(100, TimeUnit.MILLISECONDS) .subscribe(i -> { System.out.println("Received " + i + " on " + Thread.currentThread().getId()); }); System.out.println("Finished main: " + Thread.currentThread().getId());
结果:
Main: 1 Finishedmain: 1 Received 0 on 11 Received 1 on 11 Received 2 on 11
observeOn
observeOn 控制数据流的另外一端。你的 observer 如何收到事件。也就是在那个线程中回调 observer 的 onNext/onError/onCompleted 函数。Observable.create(o -> { System.out.println("Created on " + Thread.currentThread().getId()); o.onNext(1); o.onNext(2); o.onCompleted(); }) .observeOn(Schedulers.newThread()) .subscribe(i -> System.out.println("Received " + i + " on " + Thread.currentThread().getId()));
结果:
Createdon 1 Received 1 on 13 Received 2 on 13
observeOn 只影响调用该函数以后的操作函数。你可以认为 observeOn 只是拦截了数据流并且对后续的操作有作用。例如:
Observable.create(o -> { System.out.println("Created on " + Thread.currentThread().getId()); o.onNext(1); o.onNext(2); o.onCompleted(); }) .doOnNext(i -> System.out.println("Before " + i + " on " + Thread.currentThread().getId())) .observeOn(Schedulers.newThread()) .doOnNext(i -> System.out.println("After " + i + " on " + Thread.currentThread().getId())) .subscribe();
结果:
Createdon 1 Before 1 on 1 Before 2 on 1 After 1 on 13 After 2 on 13
可以看到在遇到 observeOn 之前,所有的操作发生在一个线程,之后在另外一个线程。这样可以在 Rx 数据流中不同地方设置不同的线程。
如果你知道数据流处理在那些情况需要很长时间,则可以通过这个操作来避免阻塞生产者线程。 比如在 Android 开发过程中的 UI 线程,如果在该线程中读取文件,可能会导致 UI 卡死(ANR)无响应,通过该函数可以指定读取文件在另外一个线程中执行。
unsubscribeOn
有些 Observable 会依赖一些资源,当该 Observable 完成后释放这些资源。如果释放资源比较耗时的话,可以通过 unsubscribeOn 来指定 释放资源代码执行的线程。Observable<Object> source = Observable.using( () -> { System.out.println("Subscribed on " + Thread.currentThread().getId()); return Arrays.asList(1,2); }, (ints) -> { System.out.println("Producing on " + Thread.currentThread().getId()); return Observable.from(ints); }, (ints) -> { System.out.println("Unubscribed on " + Thread.currentThread().getId()); } ); source .unsubscribeOn(Schedulers.newThread()) .subscribe(System.out::println);
结果:
Subscribedon 1 Producingon 1 1 2 Unubscribedon 11
Schedulers
observeOn 和 subscribeOn 的参数为一个Scheduler对象。Scheduler 是用来协调任务执行的。 RxJava 包含了一些常用的 Scheduler,你也可以自定义 Scheduler。 通过调用
Schedulers的工厂函数来获取标准的预定义的 Scheduler。
RxJava 内置的 Scheduler 有:
– immediate 同步执行
– trampoline 把任务放到当前线程的队列中,等当前任务执行完了,再继续执行队列中的任务
– newThread 对于每个任务创建一个新的线程去执行
– computation 计算线程,用于需要大量 CPU 计算的任务
– io 用于执行 io 操作的任务
– test 用于测试和调试
当前 computation 和 io 的实现是类似的,他们两个主要用来确保调用的场景,相当于文档说明,来表明你当前的任务是何种类型的。
大部分的 Rx 操作函数内部都使用了schedulers 。并且大部分的 Observable 操作函数也都有一个使用 Scheduler 参数的重载函数。通过重载函数可以指定该操作函数执行的线程。
scheduler 的高级特性
Rx scheduler 的使用场景并没有限定在 Rx 中,也可以在普通 Java 代码中使用。执行一个任务
Scheduler 有个 createWorker 函数,用来创建一个可以执行的任务(Scheduler.Worker)。然后可以调度该任务:
Scheduler.Workerworker = scheduler.createWorker(); worker.schedule( () -> System.out.println("Action"));
上面的任务被分配到其指定的线程中了。
还可以重复执行任务,或者只执行一次,也可以推迟任务执行:
Subscriptionschedule( Action0action, long delayTime, java.util.concurrent.TimeUnitunit) SubscriptionschedulePeriodically( Action0action, long initialDelay, long period, java.util.concurrent.TimeUnitunit)
Schedulerscheduler = Schedulers.newThread(); long start = System.currentTimeMillis(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule( () -> System.out.println(System.currentTimeMillis()-start), 5, TimeUnit.SECONDS); worker.schedule( () -> System.out.println(System.currentTimeMillis()-start), 5, TimeUnit.SECONDS);
结果:
5033 5035
上面示例中可以看到,推迟执行是从调度开始的时候计算时间的。
取消任务
Scheduler.Worker 继承至 Subscription。调用 unsubscribe 函数可以取消队列中的任务:Schedulerscheduler = Schedulers.newThread(); long start = System.currentTimeMillis(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule( () -> { System.out.println(System.currentTimeMillis()-start); worker.unsubscribe(); }, 5, TimeUnit.SECONDS); worker.schedule( () -> System.out.println(System.currentTimeMillis()-start), 5, TimeUnit.SECONDS);
结果:
5032
第一个任务中调用了 unsubscribe,这样第二个任务被取消了。下面一个示例演示任务没有执行完,被取消的情况,会抛出一个 InterruptedException 异常:
Schedulerscheduler = Schedulers.newThread(); long start = System.currentTimeMillis(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule(() -> { try { Thread.sleep(2000); System.out.println("Action completed"); } catch (InterruptedException e) { System.out.println("Action interrupted"); } }); Thread.sleep(500); worker.unsubscribe();
结果:
Actioninterrupted
schedule 返回的是一个 Subscription 对象,可以在该对象上调用取消操作,这样可以只取消这一个任务,而不是取消所有任务。
RxJava 中现有的 scheduler
ImmediateScheduler
ImmediateScheduler 并没有做任何线程调度。只是同步的执行任务。嵌套调用会导致任务被递归执行:Schedulerscheduler = Schedulers.immediate(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule(() -> { System.out.println("Start"); worker.schedule(() -> System.out.println("Inner")); System.out.println("End"); });
结果:
Start Inner End
TrampolineScheduler
TrampolineScheduler 也是同步执行,但是不嵌套任务。而是把后来的任务添加到任务队列中,等前面的任务执行完了 再执行后面的。Schedulerscheduler = Schedulers.trampoline(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule(() -> { System.out.println("Start"); worker.schedule(() -> System.out.println("Inner")); System.out.println("End"); });
结果:
Start End Inner
TrampolineScheduler 把任务安排到第一次执行任务的那个线程中执行。这样,第一次调用 schedule 的操作是阻塞的,直到队列执行完。后续的任务,会在这个线程中一个一个的执行,并且后续的调用不会阻塞。
NewThreadScheduler
NewThreadScheduler 给每个任务创建一个新的线程。定义一个打印线程信息的辅助函数:
public static void printThread(String message) { System.out.println(message + " on " + Thread.currentThread().getId()); }
示例:
printThread("Main"); Schedulerscheduler = Schedulers.newThread(); Scheduler.Workerworker = scheduler.createWorker(); worker.schedule(() -> { printThread("Start"); worker.schedule(() -> printThread("Inner")); printThread("End"); }); Thread.sleep(500); worker.schedule(() -> printThread("Again"));
结果:
Mainon 1 Starton 11 End on 11 Inneron 11 Againon 11
相关文章推荐
- javax.mail实现的邮件发送
- System.err: java.lang.IllegalArgumentException: You cannot start a load for a destroyed activity
- Java Swing之按钮点击选择文件与获取选中文件绝对路径
- Ceph RGW服务 使用s3 java sdk 分片文件上传API 报‘SignatureDoesNotMatch’ 异常的定位及规避方案
- 《Spring源码深度解析》阅读笔记2-容器的基本实现之Spring的结构组成
- java 中 equals() 相等的两个对象,hashcode() 一定相等
- eclipse使用--解决maven项目报错:找不到META-INF\MANIFEST.MF文件的问题
- log4j2.xml<Console name="Console" target="SYSTEM_OUT">
- 深入理解 Java G1 垃圾收集器--转
- Eclipse的Svn插件报错
- JVM 内部原理(七)— Java 字节码基础之二
- SpringMVC + ueditor + 七牛 实现富文本文件上传功能
- JVM 内部原理(六)— Java 字节码基础之一
- Java内存区域与内存溢出异常(二)
- JAVA 八皇后
- Java计算平方根算法
- Java --- 线程同步和异步的区别
- 【监听文件 多线程】使用java--WatchService监听文件 开启多线程copy文件
- JAVA数组学习之一:对一维数组进行逆序排列
- java.lang.IllegalArgumentException