您的位置:首页 > 编程语言 > Java开发

RxJava 学习笔记(五) --- Creating 创建操作符

2016-07-05 16:11 726 查看
Create 使用一个函数从头创建一个Observable

Just 将一个或多个对象转换成发射这个或这些对象的一个Observable

From 将一个Iterable 一个Future 或者一个数组转换成一个Observable

Defer 只有当订阅者订阅才创建Observable为每个订阅创建一个新的Observable

Timer 创建一个在给定的延时之后发射单个数据的Observable

Interval 创建一个按照给定的时间间隔发射整数序列的Observable

Range 创建一个发射指定范围的整数序列的Observable

Repeat 创建一个重复发射指定数据或数据序列的Observable

RepeatWhen 有条件的重新订阅和发射原来的Observable

1. Create —> 使用一个函数从头创建一个Observable

你可以使用
Create
操作符从头开始创建一个
Observable
,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个
Observable
–恰当的调用观察者的
onNext
onError
onCompleted
方法。

一个形式正确的有限
Observable
必须尝试调用观察者的
onCompleted
正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。



RxJava
将这个操作符实现为
create
方法。

建议你在传递给
create
方法的函数中检查观察者的
isUnsubscribed
状态,以便在没有观察者的时候,让你的
Observable
停止发射数据或者做昂贵的运算。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.


create
方法默认不在任何特定的调度器上执行。

* Javadoc:create(OnSubscribe)

*

2. Just —> 将一个或多个对象转换成发射这个或这些对象的一个Observable

创建一个发射指定值的
Observable




Just
将单个数据转换为发射那个数据的
Observable


Just
类似于
From
,但是
From
会将数组或
Iterable
的数据取出然后逐个发射,而
Just
只是简单的原样发射,将数组或
Iterable
当做单个数据。

注意:如果你传递
null
Just
,它会返回一个发射
null
值的
Observable
。不要误认为它会返回一个空
Observable
(完全不发射任何数据的
Observable
),如果需要空
Observable
你应该使用
Empty
操作符。

RxJava
将这个操作符实现为
just
函数,它接受一至九个参数,返回一个按参数列表顺序发射这些数据的
Observable


示例代码:

Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出

Next: 1
Next: 2
Next: 3
Sequence complete.


Javadoc: just(item)

3. From —> 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable

将其它种类的对象和数据类型转换为
Observable




当你使用
Observable
时,如果你要处理的数据都可以转换成展现为
Observables
,而不是需要混合使用
Observables
和其它类型的数据,会非常方便。这让你在数据流的整个生命周期中,可以使用一组统一的操作符来管理它们。

例如,
Iterable
可以看成是同步的
Observable
Future
,可以看成是总是只发射单个数据的
Observable
。通过显式地将那些数据转换为
Observables
,你可以像使用
Observable
一样与它们交互。

因此,大部分
ReactiveX
实现都提供了将语言特定的对象和数据结构转换为
Observables
的方法。

RxJava
中,
from
操作符可以转换
Future
Iterable
和数组。对于
Iterable
和数组,产生的
Observable
会发射
Iterable
或数组的每一项数据。

示例代码

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);


输出

0
1
2
3
4
5
Sequence complete


对于
Future
,它会发射
Future.get()
方法返回的单个数据。
from
方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位。如果过了指定的时长
Future
还没有返回一个值,这个
Observable
会发射错误通知并终止。

from
默认不在任何特定的调度器上执行。然而你可以将
Scheduler
作为可选的第二个参数传递给
Observable
,它会在那个调度器上管理这个
Future


Javadoc: from(array)

Javadoc: from(Iterable)

Javadoc: from(Future)

Javadoc: from(Future,Scheduler)

Javadoc: from(Future,timeout, timeUnit)

*

4. Defer —> 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable

直到有观察者订阅时才创建
Observable
,并且为每个观察者创建一个新的
Observable




Defer
操作符会一直等待直到有观察者订阅它,然后它使用
Observable
工厂方法生成一个
Observable
。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个
Observable
,事实上每个订阅者获取的是它们自己的单独的数据序列。

在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成
Observable
可以确保
Observable
包含最新的数据。

示例代码

str = "你是不是傻";

Observable<String> just_Observable = Observable.just(str);

str = "你猜";
Observable<String> defer_Observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just(str);
}
});

str = "哈哈";

just_Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("sss",s);
}
});

defer_Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("sss",s);
}
});


输出

07-04 15:04:40.582 1699-1699/? I/sss: 你是不是傻
07-04 15:04:40.582 1699-1699/? I/sss: 哈哈


RxJava
将这个操作符实现为
defer
方法。这个操作符接受一个你选择的
Observable
工厂函数作为单个参数。这个函数没有参数,返回一个
Observable


defer
方法默认不在任何特定的调度器上执行。

* Javadoc: defer(Func0)

5. Timer —> 创建一个在给定的延时之后发射单个数据的Observable

创建一个
Observable
,它在一个给定的延迟后发射一个特殊的值。



Timer
操作符创建一个在给定的时间段之后返回一个特殊值的
Observable


RxJava
将这个操作符实现为
timer
函数。

timer
返回一个
Observable
,它在延迟一段给定的时间后发射一个简单的数字
0


timer
操作符默认在
computation
调度器上执行。有一个变体可以通过可选参数指定
Scheduler


示例代码

Observable.timer(3,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long number) {
Log.d("RXJAVA", "I say " + number);
}
});


输出

它将`3`秒后发射`0`,然后就完成了


每隔一段时间就产生一个数字,没有结束符,也就是是可以产生无限个连续的数字(
但是这个已经不建议使用了,废弃掉了,因为一个名叫interval的操作符有同样的功能。
)



示例代码

Observable.timer(3,3,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long number) {
Log.d("RXJAVA", "I say " + number);
}
});


输出

每隔`3`秒产生一个数字

I say 0
I say 1
I say 2
I say 3
...


Javadoc:timer(long,TimeUnit)

Javadoc:timer(long,TimeUnit,Scheduler)

6. Interval —> 创建一个按照给定的时间间隔发射整数序列的Observable

Interval
操作符返回一个
Observable
,它按固定的时间间隔发射一个无限递增的整数序列。



它接受一个表示时间间隔的参数和一个表示时间单位的参数

Javadoc:interval(long,TimeUnit)

Javadoc:interval(long,TimeUnit,Scheduler)

7. Range —> 创建一个发射指定范围的整数序列的Observable

range
操作符是创建一组在从
n
开始,个数为
m
的连续数字,比如
range(3,10)
,就是创建
3、4、5…12
的一组数字



示例代码

Observable.range(3,10).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer i) {
System.out.println("Next:" + i.toString());
}
});


输出

Next:3
Next:4
Next:5
Next:6
...
Next:12
Sequence complete.


range
默认不在任何特定的调度器上执行。有一个变体可以通过可选参数指定
Scheduler


Javadoc: range(int,int)

Javadoc: range(int,int,Scheduler)

8. Repeat —> 创建一个重复发射指定数据或数据序列的Observable



Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。



RxJava
将这个操作符实现为
repeat
方法。它不是创建一个
Observable
,而是重复发射原始
Observable
的数据序列,这个序列或者是无限的,或者通过
repeat(n)
指定重复次数。

repeat
操作符默认在
trampoline
调度器上执行。有一个变体可以通过可选参数指定
Scheduler


示例代码

Observable.range(3,2).repeat(3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("sss",integer+" ");
}
});


输出

I/sss: 3
I/sss: 4
I/sss: 3
I/sss: 4
I/sss: 3
I/sss: 4


Javadoc:repeat()

Javadoc:repeat(long)

Javadoc:repeat(Scheduler)

Javadoc:repeat(long,Scheduler)

9. RepeatWhen —> 有条件的重新订阅和发射原来的Observable。



将原始
Observable
的终止通知(完成或错误)当做一个
void
数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的
Observable
。这个通知处理器就像一个
Observable
操作符,接受一个发射
void
通知的
Observable
为输入,返回一个发射
void
数据(意思是,重新订阅和发射原始
Observable
)或者直接终止(意思是,使用
repeatWhen
终止发射数据)的
Observable


示例代码

Observable.just(1,2,3,4).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
// 延迟1秒
return Observable.timer(1,TimeUnit.SECONDS);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(""+integer);
}
});


输出

I/System.out: 1
I/System.out: 2
I/System.out: 3
I/System.out: 4
I/System.out: 1
I/System.out: 2
I/System.out: 3
I/System.out: 4


repeatWhen
操作符默认在
trampoline
调度器上执行。有一个变体可以通过可选参数指定
Scheduler


Javadoc:repeatWhen(Func1)

Javadoc:repeatWhen(Func1,Scheduler)

参考:

http://rxjava.yuxingxin.com/chapter5/the_map_family.html

https://github.com/mcxiaoke/RxDocs

https://www.gitbook.com/book/mcxiaoke/rxdocs
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: