您的位置:首页 > 编程语言 > Java开发

RxJava 简单订阅-发布Demo

2017-06-20 11:59 405 查看

职责介绍

Observable

RxJava中存在最广泛的事件源,可以使用
just()
或者
from()
创建,创建出Observable对象以后,使用就可以使用
subscribe()
,添加订阅对象了.

Subject

RxJava中的事件源和观察者的结合体,可以使用onNext()进行监听和发射对象,使用
subscribe()
添加订阅对象.

初始化subject和Observable

// subject具有观察和发射事件的功能.
private static BehaviorSubject<Integer> sb1;
// Observable发射完事件后,就没有用处了,如果需要监听,可以使用Subject.
private static Observable<String> ob2;
private static Observable<Double> ob3;
private static CompositeSubscription mSubscription;

private static void setUp() {
sb1 = BehaviorSubject.create();
ob2 = Observable.from(Arrays.asList("a", "b"));
ob3 = Observable.from(Arrays.asList(.1D, .2D, .3D));
}


添加订阅

mSubscription = new CompositeSubscription();
mSubscription.add(sb1.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subInt));
mSubscription.add(ob2.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subString));
mSubscription.add(ob3.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subDouble));

// 这里使用Timer不断的让subject发布数据
ScheduledExecutorService e = Executors.newScheduledThreadPool(0);
e.scheduleAtFixedRate(() -> sb1.onNext(++counter), 1, 1, TimeUnit.SECONDS);

// 检测条件状态
while (true) {
if (counter >= 15) {
e.shutdown();
System.exit(0);
} else if (counter >= 10) {
mSubscription.unsubscribe();
}
}


输出结果如下:

a
b
0.1
0.2
0.3
1
2
3
4
5
6
7
8
9
10


完整代码


public class SubscriptionTest {

// subject具有观察和发射事件的功能.
private static BehaviorSubject<Integer> sb1;
// Observable发射完事件后,就没有用处了,如果需要监听,可以使用Subject.
private static Observable<String> ob2;
private static Observable<Double> ob3;
private static CompositeSubscription mSubscription;

private static volatile int counter = 0;

private static void setUp() {
sb1 = BehaviorSubject.create();
ob2 = Observable.from(Arrays.asList("a", "b"));
ob3 = Observable.from(Arrays.asList(.1D, .2D, .3D));
}

public static void main(String[] args) {
setUp();
mSubscription = new CompositeSubscription(); mSubscription.add(sb1.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subInt)); mSubscription.add(ob2.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subString)); mSubscription.add(ob3.observeOn(Schedulers.immediate()).subscribe(SubscriptionTest::subDouble)); // 这里使用Timer不断的让subject发布数据 ScheduledExecutorService e = Executors.newScheduledThreadPool(0); e.scheduleAtFixedRate(() -> sb1.onNext(++counter), 1, 1, TimeUnit.SECONDS); // 检测条件状态 while (true) { if (counter >= 15) { e.shutdown(); System.exit(0); } else if (counter >= 10) { mSubscription.unsubscribe(); } }
}

private static void subInt(Integer num) {
System.out.println(num);
}

private static void subString(String s) {
System.out.println(s);
}

private static void subDouble(double d) {
System.out.println(d);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: