RxJava 简单订阅-发布Demo
2017-06-20 11:59
405 查看
职责介绍
ObservableRxJava中存在最广泛的事件源,可以使用
just()或者
from()创建,创建出Observable对象以后,使用就可以使用
subscribe(),添加订阅对象了.
Subject
RxJava中的事件源和观察者的结合体,可以使用onNext()进行监听和发射对象,使用
subscribe()添加订阅对象.
初始化subject和Observable
// subject具有观察和发射事件的功能. private static BehaviorSubject<Integer> sb1; // Observable发射完事件后,就没有用处了,如果需要监听,可以使用Subject. private static Observable<String> ob2; private static Observable<Double> ob3; private static CompositeSubscription mSubscription; private static void setUp() { sb1 = BehaviorSubject.create(); ob2 = Observable.from(Arrays.asList("a", "b")); ob3 = Observable.from(Arrays.asList(.1D, .2D, .3D)); }
添加订阅
mSubscription = new CompositeSubscription(); mSubscription.add(sb1.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subInt)); mSubscription.add(ob2.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subString)); mSubscription.add(ob3.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subDouble)); // 这里使用Timer不断的让subject发布数据 ScheduledExecutorService e = Executors.newScheduledThreadPool(0); e.scheduleAtFixedRate(() -> sb1.onNext(++counter), 1, 1, TimeUnit.SECONDS); // 检测条件状态 while (true) { if (counter >= 15) { e.shutdown(); System.exit(0); } else if (counter >= 10) { mSubscription.unsubscribe(); } }
输出结果如下:
a b 0.1 0.2 0.3 1 2 3 4 5 6 7 8 9 10
完整代码
public class SubscriptionTest {
// subject具有观察和发射事件的功能.
private static BehaviorSubject<Integer> sb1;
// Observable发射完事件后,就没有用处了,如果需要监听,可以使用Subject.
private static Observable<String> ob2;
private static Observable<Double> ob3;
private static CompositeSubscription mSubscription;
private static volatile int counter = 0;
private static void setUp() {
sb1 = BehaviorSubject.create();
ob2 = Observable.from(Arrays.asList("a", "b"));
ob3 = Observable.from(Arrays.asList(.1D, .2D, .3D));
}
public static void main(String[] args) {
setUp();
mSubscription = new CompositeSubscription(); mSubscription.add(sb1.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subInt)); mSubscription.add(ob2.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subString)); mSubscription.add(ob3.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subDouble)); // 这里使用Timer不断的让subject发布数据 ScheduledExecutorService e = Executors.newScheduledThreadPool(0); e.scheduleAtFixedRate(() -> sb1.onNext(++counter), 1, 1, TimeUnit.SECONDS); // 检测条件状态 while (true) { if (counter >= 15) { e.shutdown(); System.exit(0); } else if (counter >= 10) { mSubscription.unsubscribe(); } }
}
private static void subInt(Integer num) {
System.out.println(num);
}
private static void subString(String s) {
System.out.println(s);
}
private static void subDouble(double d) {
System.out.println(d);
}
}
相关文章推荐
- python的redis,实用订阅发布简单实用代码
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- Redis之订阅发布简单实例
- RxAndroid从零开始学习之一(RxJava的简单Demo)
- 简单的订阅发布模式
- 简单的WCF发布-订阅(Pub/Sub)服务
- 利用Thrift和zk简单实现服务治理框架中的订阅发布机制
- Redis中的简单事物以及消息订阅发布
- MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例
- 这是用RxJava实现的一个简单demo
- MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例
- 消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试
- 消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试
- JavaScript中发布/订阅模式的简单实例
- 学习笔记-js发布/订阅模式的简单实现
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例
- JavaScript中发布/订阅模式的简单实例
- 消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试
- 观察者模式(订阅与发布模式),史上最简单的观察者和被观察者理解;