RxJava Subscription 自动取消订阅
2016-05-30 23:09
357 查看
在RxJava Observer与Subscriber的关系 一文中,我们提到:
subscribe(mObserver)和subscribe(mSubscriber)执行结果就会有区别:
subscribe(mSubscriber)这种订阅方式在第二次请求数据时就不会执行了,原因就是第一次onNext后自动取消了订阅;
subscribe(mObserver)则不出现此问题。
今天我们分析下原因。
先来看看源码。
Subscription
RxJava中有个叫做Subscription的接口,可以用来取消订阅.
从上面可以看到,我们只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?
其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.
感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.
在我想来输出的日志应该是这样的:
但是,结果出乎我的意料,我运行之后是这样的:
明明我没有取消订阅啊,怎么就true了呢?
接下去我进源码探索了一下发现:
在Observable.subscribe()源码:
关键代码:
它会把我们传递的subscriber转成SafeSubscriber,SafeSubcriber源码如下:
从源码中不能发现:
原来它在finally里自动取消了订阅!!
同样,在出现error时,也会自动取消订阅。
本文参考了http://www.jianshu.com/p/c26f96c0ab81
困扰我了好久的问题终于解决了,呜呼!
可以结合下面文章来看:
/article/11841860.html
subscribe(mObserver)和subscribe(mSubscriber)执行结果就会有区别:
subscribe(mSubscriber)这种订阅方式在第二次请求数据时就不会执行了,原因就是第一次onNext后自动取消了订阅;
subscribe(mObserver)则不出现此问题。
今天我们分析下原因。
先来看看源码。
Subscription
RxJava中有个叫做Subscription的接口,可以用来取消订阅.
public interface Subscription { /** * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription * was received. * <p> * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before * onCompleted is called). */ void unsubscribe(); /** * Indicates whether this {@code Subscription} is currently unsubscribed. * * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise */ boolean isUnsubscribed(); }
从上面可以看到,我们只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?
其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.
感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.
实战
我先写了以下代码来测试:Subscription subscription = Observable.just("Hello subscription") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); System.out.println(subscription.isUnsubscribed()); subscription.unsubscribe(); System.out.println(subscription.isUnsubscribed());
在我想来输出的日志应该是这样的:
Hello subscription false true
但是,结果出乎我的意料,我运行之后是这样的:
Hello subscription true true
明明我没有取消订阅啊,怎么就true了呢?
接下去我进源码探索了一下发现:
在Observable.subscribe()源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaPluginUtils.handleException(hook.onSubscribeError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } } return Subscriptions.unsubscribed(); } }
关键代码:
if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); }
它会把我们传递的subscriber转成SafeSubscriber,SafeSubcriber源码如下:
public class SafeSubscriber<T> extends Subscriber<T> { private final Subscriber<? super T> actual; boolean done = false; public SafeSubscriber(Subscriber<? super T> actual) { super(actual); this.actual = actual; } /** * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications. * <p> * The {@code Observable} will not call this method if it calls {@link #onError}. */ @Override public void onCompleted() { if (!done) { done = true; try { actual.onCompleted(); } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); RxJavaPluginUtils.handleException(e); throw new OnCompletedFailedException(e.getMessage(), e); } finally { try { // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken // and we throw an UnsubscribeFailureException. unsubscribe(); } catch (Throwable e) { RxJavaPluginUtils.handleException(e); throw new UnsubscribeFailedException(e.getMessage(), e); } } } } /** * Notifies the Subscriber that the {@code Observable} has experienced an error condition. * <p> * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onCompleted}. * * @param e * the exception encountered by the Observable */ @Override public void onError(Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); if (!done) { done = true; _onError(e); } } /** * Provides the Subscriber with a new item to observe. * <p> * The {@code Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param args * the item emitted by the Observable */ @Override public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwOrReport(e, this); } } /** * The logic for {@code onError} without the {@code isFinished} check so it can be called from within * {@code onCompleted}. * * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a> */ protected void _onError(Throwable e) { RxJavaPluginUtils.handleException(e); try { actual.onError(e); } catch (Throwable e2) { if (e2 instanceof OnErrorNotImplementedException) { /* * onError isn't implemented so throw * * https://github.com/ReactiveX/RxJava/issues/198 * * Rx Design Guidelines 5.2 * * "when calling the Subscribe method that only has an onNext argument, the OnError behavior * will be to rethrow the exception on the thread that the message comes out from the observable * sequence. The OnCompleted behavior in this case is to do nothing." */ try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); } throw (OnErrorNotImplementedException) e2; } else { /* * throw since the Rx contract is broken if onError failed * * https://github.com/ReactiveX/RxJava/issues/198 */ RxJavaPluginUtils.handleException(e2); try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); } } // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch try { unsubscribe(); } catch (RuntimeException unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException(unsubscribeException); } } /** * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}. * * @return the {@link Subscriber} that was used to create this {@code SafeSubscriber} */ public Subscriber<? super T> getActual() { return actual; } }
从源码中不能发现:
public void onCompleted() { if (!done) { done = true; try { actual.onCompleted(); } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); // handle errors if the onCompleted implementation fails, not just if the Observable fails _onError(e); } finally { // auto-unsubscribe unsubscribe(); } } }
原来它在finally里自动取消了订阅!!
同样,在出现error时,也会自动取消订阅。
protected void _onError(Throwable e) { RxJavaPluginUtils.handleException(e); try { actual.onError(e); } catch (Throwable e2) { if (e2 instanceof OnErrorNotImplementedException) { /* * onError isn't implemented so throw * * https://github.com/ReactiveX/RxJava/issues/198 * * Rx Design Guidelines 5.2 * * "when calling the Subscribe method that only has an onNext argument, the OnError behavior * will be to rethrow the exception on the thread that the message comes out from the observable * sequence. The OnCompleted behavior in this case is to do nothing." */ try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); } throw (OnErrorNotImplementedException) e2; } else { /* * throw since the Rx contract is broken if onError failed * * https://github.com/ReactiveX/RxJava/issues/198 */ RxJavaPluginUtils.handleException(e2); try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); } } // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch try { unsubscribe(); } catch (RuntimeException unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException(unsubscribeException); } }
本文参考了http://www.jianshu.com/p/c26f96c0ab81
困扰我了好久的问题终于解决了,呜呼!
可以结合下面文章来看:
/article/11841860.html
相关文章推荐
- Struts2上课笔记
- 导入JFrame时eclipse编译报错
- SSM之DAO部分Spring与MyBatis整合
- 三种初级排序算法(冒泡、选择、直接插入)java实现及其性能比较
- Spring注解
- 请求转发和重定向的问题
- java多线程与并发笔记
- java多线程 --run()和Start()的区别
- Spring AOP
- Java实现加密算法
- java常用代码
- java线程的同步代码块关键字synchronized
- java学习笔记整理
- [原创]java WEB学习笔记28: 会话与状态管理Cookie 机制
- java多线程---等待/唤醒以及生产者消费者经典同步synchronized的实现
- JAVA_OA管理系统(四)番外篇:使用Spring注解注入属性
- 华为机试---字符串运用-密码截取
- eclipse查看代码继承关系
- 【Java学习笔记】多线程
- java中读取输入的方式