您的位置:首页 > 移动开发 > Android开发

RxAndroid响应式开发(三)

2016-11-05 14:38 302 查看
RxAndroid响应式开发(三)
  这篇博客第二遍写了,真是好无奈,上次本来一上午已经完成,奈何不知怎么的一下点了舍弃,本来想保存的这可好啥都没有了。重写呗,就当复习了,在此也告诫所有写博客的同僚,点击了舍弃就是彻底删除,注意注意注意。

  这篇从源码和原理上来分析一下RxJava的使用和实现,主要包括Observable(被观察者),Observer/Subscriber(观察者),还有订阅关系
subscribe


Observer/Subscriber(观察者)的联系和区别

Observer 是一个接口,它的作用就是对事件作出瞬间响应(担当的是警察的角色)。

public interface Observer<T> {

/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onCompleted();

/**
* @param e
*          the exception encountered by the Observable
*/
void onError(Throwable e);

/**
* @param t
*          the item emitted by the Observable
*/
void onNext(T t);

}


Observer 有三个方法:

onNext(T t):一个参数方法,所有的事件都在此方法中执行,Rxjava中需要对每个事件单独执行,并且以队列的形式依次执行。

onCompleted():无参方法,表示事件正常结束,当没有onNext()方法发出的时候,需要触发onCompleted()方法标志事件正常完成。

onError(Throwable e):一个参数方法,事件执行遇到异常,同时剩余的onNext不再执行。

注意:onCompleted 和 onError 两个方法只能有一个执行,并且只能在事件序列的最后一个,要么是事件顺利完成触发onCompleted要么是出现异常触发onError 。

Subscriber 是实现了Observer接口的抽象类,做了一些对事件处理的扩展,但是和Observer的基本使用方法还是一样的。

public abstract class Subscriber<T> implements Observer<T>, Subscription {

// represents requested not set yet
private static final long NOT_SET = Long.MIN_VALUE;

private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer producer;
/* protected by `this` */
private long requested = NOT_SET; // default to not set

protected Subscriber() {
this(null, false);
}

/**
* @param subscriber
*            the other Subscriber
*/
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}

/**
* @param subscriber
*            the other Subscriber
* @param shareSubscriptions
*            {@code true} to share the subscription list in {@code subscriber} with
*            this instance
* @since 1.0.6
*/
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}

/**
* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
* unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
* explicitly unsubscribing the new {@code Subscription} as well.
*
* @param s
*            the {@code Subscription} to add
*/
public final void add(Subscription s) {
subscriptions.add(s);
}

@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}

/**
* Indicates whether this Subscriber has unsubscribed from its list of subscriptions.
*
* @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise
*/
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}

/**
* This method is invoked when the Subscriber and Observable have been connected but the Observable has
* not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
* useful initialization to your subscription, for instance to initiate backpressure.
*/
public void onStart() {
// do nothing by default
}

}


  在Rxjava中,如果使用Observer作为观察者,最后也会转化成Subscriber进行使用,本质上他们一样,但是在使用上他们还是有些区别的。

onStart():这是Subscriber增加的一个空方法,在订阅方法subscribe()中执行,此时事件还未发送,开发者可以做一些准备工作像数据初始化啊,数据、数组清零等,onStart所在的线程与subscribe()订阅方法在同一个线程,在这处理一些UI操作还是要谨慎,根据指定subscribe()方法所在线程而定,指定线程后面说。

isUnsubscribed():判断当前订阅状态。

unsubscribe():用于取消订阅,这是实现的另一个接口Subscription的方法,这个方法执行后,Subscriber就不会再接收事件了。

在观察者和被观察者订阅之后,被观察者Observable会持有Subscriber的引用,要在适当的地方取消订阅关系,避免内存泄露。

Observable(被观察者)

  Observable决定了事件的触发时间,一般是在与观察者实现订阅的时候进行事件的触发。RxJava中最基本的创建一个Observable对象是通过create()方法,还有一堆其他的操作符可以实现创建Observable,后面介绍。

//创建被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//订阅关系
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("hello");
subscriber.onNext("World");
subscriber.onNext("Hello World");
subscriber.onCompleted();
}
}
});


/**
* Invoked when Observable.subscribe is called.
* @param <T> the output value type
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}


  call的回调顺序是三个onNext(),最后是onCompleted()。OnSubscribe作为参数传递给create(),OnSubscribe是一个继承了Action1的接口,而回调的call方法就是Action1的,当Observable被订阅的时候,激活回调call(),然后事件就会按顺序依次执行。这样,被观察者调用观察者的回调方法,然后把用户事件传递给观察者,就实现了观察者模式。

  这里提到的Action1是什么东东,其实还有Action0,Action2,Action3这些,后面详细介绍。

Subscribe(订阅)

  实现订阅的方法就是subscribe(),这样就实现了Observable和Subscriber的订阅关系,整个事件流就可以执行下去。

observable.subscribe(observer);
//关联被观察者
observable.subscribe(subscriber);


  订阅方法的API源码分析如下,重点看红色标注字体。此方法在后面有提到,暂且称它为总方法。

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}


大概流程就是先执行subscriber.onStart(),然后执行回调onSubscribe.call(subscriber),最后把Subscriber返回;

主要就是这三件事:

1、onStart()方法是一个空方法,在回调call之前调用;

2、回调方法call方法在订阅方法中被回调执行,所以我们看到,只有当订阅事件subscribe()方法被执行了,才会有事件的执行。

3、把观察者Subscriber返回,方便后续判断unsubscribe();

整个过程可以使用下图来表示:onNext依次执行,最后是onCompleted.



关于订阅方法subscribe()有多个重载方法,源码分别如下:

//没有参数
public final Subscription subscribe() {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

//一个参数
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

//两个参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

//三个参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onCompleted == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

//传入Observer观察者
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}

//传入Subscriber观察者
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}


订阅方法有多个,其实总归来说,最后调用的都是上面提到的总方法。这是Rxjava的订阅subscribe()实现的不完整回调,意思就是不一定非要与“观察者”实现订阅(加引号的原因就是本质上其实就是与Subscriber进行了订阅关系)。可以是一个onNext方法也可以是一个onError或者onCompleted方法,但是最后都会被包装成一个观察者对象。

使用实例如下:

Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: ");
}
};

Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable shrowable) {
Log.i(TAG, "call: ");
}
};

Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.i(TAG, "call: ");
}
};

//自动创建Subscriber,使用onNextAction定义onNext();
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);


  Action家族又出现了,下面来详细介绍一下他们这一家子。

一系列的Action

  这些Action都是一些接口,分别有Action0、Action1、Action2、Action3,没有发现Action4,他们都是Action的子类,,0,1,2,3是根据参数个数来定的,贴出源码如下。

/**
* All Action interfaces extend from this.
* <p>
* Marker interface to allow instanceof checks.
*/
public interface Action extends Function {

}


/**
* A zero-argument action.
*/
public interface Action0 extends Action {
void call();
}

/**
* A one-argument action.
* @param <T> the first argument type
*/
public interface Action1<T> extends Action {
void call(T t);
}

/**
* A two-argument action.
* @param <T1> the first argument type
* @param <T2> the second argument type
*/
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}

/**
* A three-argument action.
* @param <T1> the first argument type
* @param <T2> the second argument type
* @param <T3> the third argument type
*/
public interface Action3<T1, T2, T3> extends Action {
void call(T1 t1, T2 t2, T3 t3);
}


onCompleted()是无参方法,可以用Action0来包装对象使用;onError()和onNext()都有一个参数,可以使用Action1来包装使用,不管传递了几个参数,最后在subscribe()订阅方法中都被重新创建Subscriber观察者来进行订阅关系。

  在RxJava中还有一个大家族Func,有Func0~Func9,暂时还不知道他们作用,留作以后了解吧。

  同时,在RxJava中还有一大堆方便的操作符进行事件的更改,简直太强大了,之后研究介绍吧。

  本人菜鸟,欢迎评论指导,共同学习。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: