您的位置:首页 > 移动开发 > Android开发

RxJava初步了解和在Android中的使用

2015-12-04 23:15 507 查看

RxJava初步了解和在Android中的使用

推荐阅读《给 Android 开发者的 RxJava 详解

RxJava源码执行过程

(首先声明,除了例子外,为了便于理解,本文引用的源码删除了大部分!可到GitHub查看完整源码。)

先来看一个简单的例子

//被观察者
Observable.OnSubscribe<Integer> integerOnSubscribe = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
};

//观察者
Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(Integer i) { System.out.println(i); }
};

//订阅
Observable.create(integerOnSubscribe)
.subscribe(integerSubscriber);


执行上面的例子,将看到以下结果:

1
2
3
onCompleted


那么在这过程中发生了什么呢?一起到源码里看看!

先来看看
create()
,在
Observable
类中,有这样的定义:

final OnSubscribe<T> onSubscribe;

protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}

public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(f);
}


也就是说,上面的例子中,我们的
integerOnSubscribe
对象被
onSubscribe
变量引用了。

接着来再看与
subscribe()
有关的关键的代码:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
//this - 即当前对象传递给下面的函数,主要使用this.onSubscriber引用
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//删除了大部分代码,只保留关键代码,并且假设执行过程没有异常抛出
observable.onSubscribe.call(subscriber);
return subscriber;
}


注意最关键的代码
observable.onSubscribe.call(subscriber)
,这里的
observable.onSubscribe
也就是我们的
integerOnSubscribe
对象引用,而
subscriber
就是我们传入
subscribe()
integerSubscriber
对象。所以这句代码的中变量替换为上面例子的对象就变为:

integerOnSubscribe.call(integerSubscriber);


也就是说在
integerOnSubscribe.call()
中执行了

integerSubscriber.onNext(1);
integerSubscriber.onNext(2);
integerSubscriber.onNext(3);
integerSubscriber.onCompleted();


因此,我们的程序也就依次输出了

1
2
3
onCompleted


总结一下整个过程就是:有一个被观察者
integerSubscriber
和一个观察者
integerSubscriber
,使用
subscribe()
使他们产生订阅关系,
subscribe()
所做的事情就是,将观察者传递给被观察者的
call()
,并在
call()
中执行观察者的
onNext()
onCompleted()


integerSubscriber.call(integerSubscriber);


使用其他方法创建Observable

RxJava
中,为了便于使用,创建被观察者有许多不同的方法,比如
from()


from()

我们上面的例子可以改为:

Integer[] array = {1, 2, 3};
Observable.from(array)
.subscribe(integerSubscriber);


Observable.class
中:

public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
}

public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
OnSubscribeFromIterable<T> fromIterable = new OnSubscribeFromIterable<T>(iterable);

return create(fromIterable);    //在create()中this.onSubscribe = fromIterable
}


这里用到了
OnSubscribeFromIterable.class
,进去看看:

public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

final Iterable<? extends T> is;

public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
this.is = iterable;
}

@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());    //本例中,即integerSubscriber.onNext(it.next);
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
}


所以将
subscribe()
observable.onSubscribe.call(subscriber)
换为上面例子的变量,即为:

//observable.onSubscriber是在Observable.create()被赋值
//this.onSubscribe = f,见上一个例子分析
observable.onSubscriber <= fromIterable
subscriber <= integerSubscriber

observable.onSubscribe.call(subscriber); <= fromIterable.call(integerSubscriber);


而执行
fromIterable.call(integerSubscriber)
时,即调用
OnSubscribeFromIterable.class
中的
call(final Subscriber<? super T> o)
,就会执行:

o.onNext(it.next());    //integerSubscriber.onNext(1);
//integerSubscriber.onNext(2);
//integerSubscriber.onNext(3);
//integerSubscriber.onCompleted();


just()

just()
分为两种情况:

1. 当
just()
的参数不止一个时,将会调用
from()
处理,这里不重复讲解。

//just()两个参数时
public final static <T> Observable<T> just(T t1, T t2) {
return from(Arrays.asList(t1, t2));
}


just()
参数只有一个时:

public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}


看看
ScalarSynchronousObservable.class
有什么!!

public final class ScalarSynchronousObservable<T> extends Observable<T> {
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}

private final T t;

protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {

@Override
public void call(Subscriber<? super T> s) {
s.onNext(t);
s.onCompleted();
}

});
this.t = t;
}
}


很简单,里面就帮我们新建了一个
new OnSubscribe<T>()
并传递给父类,也就是
Observable


比如:

Observable.just(100)
.subscribe(integerSubscriber);


当调用
subscribe()
就是依次执行
s.onNext(t);
s.onCompleted();
。相当于:

Observable.OnSubscribe<Integer> integerOnSubscribe = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(100);
subscriber.onCompleted();
}
};
Observable.create(integerOnSubscribe)
.subscribe(integerSubscriberr);


just()
让我们的代码看起来更简洁明了。

OK!还有很多创建
Observable
的方法等你到源码中发掘。

RxJava中丰富的操作符

操作符学习推荐阅读:官方英文 翻译中文

这里只通过
map()
操作符来讲最核心的
lift()
方法。

lift()

由于
lift()
接受一个
操作符
参数,所以我们这里使用
map()
来讲
lift()
,也可以随便了解
map()
,假设代码是这样的:

Observable.OnSubscribe<Integer> myOnSubscribe = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
};
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.print(s);
}
};
Func1<Integer, String> myFunc = new Func1<Integer, String>() {
@Override
public String call(Integer i) {
return i + " programmer";
}
};

Observable.create(myOnSubscribe)
.map(myFunc)
.subscribe(mySubscriber);


这里只看
.map(myFunc)
这一句代码,其他部分分析可以查看开头的例子。先看看
map()
的源码:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}


出现了
lift()
,进入看看:

//经过删减和修改的代码
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
Observable.OnSubscribe<String> onSub = new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = operator.call(o);
st.onStart();
onSubscribe.call(st);
}
}
//总是返回新被观察者对象
return new Observable<R>(onSub);
}


可以看到,
lift()
总是创建一个新的
被观察者
。在看看
new OperatorMap<T, R>(func)
干了什么:

public final class OperatorMap<T, R> implements Operator<R, T> {
//转换操作函数
private final Func1<? super T, ? extends R> transformer;
//构造函数
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onNext(T t) {
o.onNext(transformer.call(t));
}
};
}
}


换成上面例子的变量,则
lift()


Observable.OnSubscribe<String> onSub = new OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> o) {
Subscriber<Integer> st = new OperatorMap<Integer,String>(myFunc).call(o);
= new Subscriber<Integer>(Subscriber<? super String> o){
onNext(Integer t){
o.onNext(myFunc.call(t));
}
}
onSubscribe.call(st);   <===    myOnSubscribe.call(st)
}
}


订阅时:

Observable.create(myOnSubscribe)

.map(myFunc)  <===  return new Observable<String>(onSub)

.subscribe(mySubscriber);  <===   onSub.call(mySubscriber)


所以运行流程为:

//本例中t=1
myOnSubscribe.call(st) -> st.onNext(t) -> mySubscriber.onNext(myFunc.call(t))


在Android项目中使用RxJava

在Android Studio中使用
RxJava
需要使用Gradle下载依赖库,编译配置中增加依赖代码即可。

依赖代码:

compile 'io.reactivex:rxandroid:1.0.1'
compile 'io.reactivex:rxjava:1.0.14'


下载依赖库的步骤:

1. 打开文件
app\build.gradle


2. 找到
dependencies
,并增加依赖代码

3. 编译项目(或
Build
-
Rebuild Project


这样Gradle在编译的时候就会帮我下载好依赖库,然后就可以在项目中使用
RxAndroid
啦~~

上图



在项目中依次创建被观察者和观察者,并建立他们的订阅关系。

//创建可观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("One");
subscriber.onNext("Two");
subscriber.onNext("Three");
subscriber.onCompleted();
}
});

//创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted()");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError()");
}

@Override
public void onNext(String s) {
Log.d(TAG, s);
}
};

//建立订阅关系
observable.subscribe(observer);


编译并运行项目则可以看见日志输出:

D/MainActivity: One
D/MainActivity: Two
D/MainActivity: Three
D/MainActivity: onCompleted()


在View上使用Rx

上面的例子只是一般的RxJava,并没有结合到Android的什么东西。接下来我将举个例子:如何使用RxJava来实现对EditText输入文本改变的监听和CheckBox选择与否的监听、还有Button点击事件的监听。

这里还需要增加另一个依赖库,用于获取View的行为。

compile 'com.jakewharton.rxbinding:rxbinding:0.2.0'


先看看最终的实现效果:



界面代码就不上了,直接看使用Rx的代码:

//被观察者,可输入框是否为空
Observable<Boolean> editTextChangeEvent = RxTextView.textChangeEvents(mEditText)
.map(new Func1<TextViewTextChangeEvent, Boolean>() {
@Override
public Boolean call(TextViewTextChangeEvent textViewTextChangeEvent) {
return TextUtils.isEmpty(textViewTextChangeEvent.text());
}
});

//被观察者,可CheckBox是否已选中
Observable<Boolean> checkBoxChangeEvent = RxCompoundButton.checkedChangeEvents(mCheckBox)
.map(new Func1<CompoundButtonCheckedChangeEvent, Boolean>() {
@Override
public Boolean call(CompoundButtonCheckedChangeEvent compoundButtonCheckedChangeEvent) {
return compoundButtonCheckedChangeEvent.isChecked();
}
});

//合并订阅,设置按钮可点击性
Observable.combineLatest(editTextChangeEvent, checkBoxChangeEvent, new Func2<Boolean, Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean, Boolean aBoolean2) {
return !aBoolean||aBoolean2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG, "setEnabled " + String.valueOf(aBoolean));
mBtnClick.setEnabled(aBoolean);
}
});

//按钮点击事件
RxView.clicks(mBtnClick)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
Toast.makeText(MainActivity.this, mEditText.getText().toString(), Toast.LENGTH_LONG).show();
}
});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: