RxJava之Subject
2016-05-01 20:35
309 查看
Subject
RxJava提供四种不同的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject。
通过上述代码可知,通过PublishSubject.create()方法创建一个PublishSubject对象,订阅了一个Observer对象,但并没有发送数据。此时,没有发送数据,观察者只能等待,没有线程阻塞,没有资源消耗。在调用publishSubject.onNext("Hello RxJava");时,才发送消息。 发送消息结束以后,publishSubject并没有结束,观察者等待消息再一次的发送。如果想关闭publishSubject,publishSubject需调用publishSubject.onCompleted方法关闭。此时,publishSubject再发送消息,观察者不能收到发送的消息。
Observable关闭对象时,可以调用doOnCompleted()方法关闭,但是此方法对Subject无效,不知道因为啥?
就像Observable和Observer的契约类,它可以是一个Observable同时也可以是一个Observer:它作为连接它们的一座桥梁。一个Subject可以订阅一个Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。很明显,作为一个Observable,观察者们或者其它Subject都可以订阅它。
RxJava提供四种不同的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject。
PublishSubject
<span style="font-size:14px;"> PublishSubject<String> publishSubject = PublishSubject.create(); Subscription subscription = publishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i("123", "PublishSubject: " + s); } });</span>
<span style="font-size:14px;"> publishSubject.onNext("Hello RxJava");</span>
通过上述代码可知,通过PublishSubject.create()方法创建一个PublishSubject对象,订阅了一个Observer对象,但并没有发送数据。此时,没有发送数据,观察者只能等待,没有线程阻塞,没有资源消耗。在调用publishSubject.onNext("Hello RxJava");时,才发送消息。 发送消息结束以后,publishSubject并没有结束,观察者等待消息再一次的发送。如果想关闭publishSubject,publishSubject需调用publishSubject.onCompleted方法关闭。此时,publishSubject再发送消息,观察者不能收到发送的消息。
Observable关闭对象时,可以调用doOnCompleted()方法关闭,但是此方法对Subject无效,不知道因为啥?
BehaviorSubject
BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。<span style="font-size:14px;"> BehaviorSubject behaviorSubject = BehaviorSubject.create("First"); Subscription subscription = behaviorSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { cout++; Log.i("123", s + " | " + cout); } });</span>
<span style="font-size:14px;"> behaviorSubject.onNext("Hello RxJava");</span>根据上述代码,在创建BehaviorSubject后,每当Observes订阅它时,它向观察者发送消息即"First",观察者接收到消息并处理“First”消息后,再处于等待状态,等待被观察者下次发送消息。
ReplaySubject
ReplaySubject会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发。<span style="font-size:14px;"> ReplaySubject replaySubject = ReplaySubject.create(); replaySubject.onNext(1); replaySubject.onNext(2); replaySubject.onNext(3); replaySubject.onNext(4); replaySubject.onCompleted(); Observer<Integer> observer_1 = new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("123", "observer_1" + " - " + integer); } }; Observer<Integer> observer_2 = new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("123", "observer_2" + " - " + integer); } }; replaySubject.subscribe(observer_1); replaySubject.subscribe(observer_2);</span>分析上述代码,replaySubject创建后,订阅它预发送的消息(1,2,3,4)并执行onCompleted。此时,replaySubject将订阅的消息缓存。若有观察者订阅其时,replaySubject自动将缓存的消息重发。
AsyncSubject
当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。<span style="font-size:14px;"> AsyncSubject asyncSubject = AsyncSubject.create(); Observer<Integer> observer = new Observer<Integer>() { public void onCompleted() { } public void onError(Throwable e) { } public void onNext(Integer integer) { Log.i("123", "AsyncSubject - observer:" + integer); } }; asyncSubject.subscribe(observer); asyncSubject.onNext(1); asyncSubject.onNext(2); asyncSubject.onNext(3); asyncSubject.onNext(4); asyncSubject.onCompleted();</span>
<span style="font-size:14px;"> // 打印的信息 **.rxjavademo I/123: AsyncSubject - observer:4AsyncSubject - observer:4</span>从打印的消息可以看到,观察者除了收到最后的消息以外,其他的消息并没有接收到。
参考资料
RxJava Essentials
相关文章推荐
- struts学习-HelloWorld
- Java 利用iText生成pdf并导出
- Java的多态性
- java之Map转List
- Java学习-12.通过异常处理错误
- Java学习-11.持有对象
- javasavacript 递归优化,递归变while
- Struts之OGNL
- Java 随机生成中文姓名,手机号,邮编,住址
- java二维数组
- 深入理解Java内存模型——final
- java如何获取其它用户登录的真是IP地址
- RxJava-简介及Observable创建
- 20145225《Java程序设计》 第9周学习总结
- 20145223 《Java程序程序设计》实验报告4
- java数组2
- 20145235 《Java程序设计》第9周学习总结
- 20145215《Java程序设计》第9周学习总结
- 深入理解Java虚拟机(四)-垃圾收集算法
- 图论最短路径算法-Floyd算法-JAVA代码实现