RxJava取消订阅的各种方式的实现
2019-09-19 14:34
2141 查看
手动取消订阅
Consumer类型
Observable创建返回Disposable取消
public class SecondActivity extends AppCompatActivity { private static final String TAG = "SecondActivity"; private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_second); disposable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: "+s); } }); } @Override protected void onDestroy() { super.onDestroy(); Log.d(TAG, "onDestroy: "); //取消订阅 if(disposable != null && !disposable.isDisposed()){ disposable.dispose(); Log.d(TAG, "onDestroy: dispose"); } } }
普通类型Observer
在Observer中获取Disposable然后取消
public class ThirdActivity extends AppCompatActivity { private static final String TAG = "ThirdActivity"; Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_third); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); } @Override protected void onDestroy() { super.onDestroy(); Log.d(TAG, "onDestroy: "); //然后在需要取消订阅的地方调用即可 if (disposable != null && !disposable.isDisposed()) { Log.d(TAG, "dispose: "); disposable.dispose(); } } }
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
public class FourthActivity extends AppCompatActivity { private static final String TAG = "FourthActivity"; private DisposableObserver<String> observer; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_fourth); observer = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(String o) { Log.d(TAG, "onNext: "+o); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); } @Override protected void onDestroy() { super.onDestroy(); if (observer != null && !observer.isDisposed()) { Log.d(TAG, "dispose: "); observer.dispose(); } } }
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
public class ComDisposableActivity extends AppCompatActivity { private Disposable disposable1; private Disposable disposable2; private static final String TAG = "ComDisposableActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_com_disposable); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnDispose(new Action() { @Override public void run() throws Exception { Log.d(TAG, "run: Unsubscribing subscription from onCreate()"); } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { disposable1 = d; } @Override public void onNext(String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { disposable2 = d; } @Override public void onNext(String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); } @Override protected void onDestroy() { super.onDestroy(); CompositeDisposable compositeDisposable = new CompositeDisposable(); //批量添加 compositeDisposable.add(disposable1); compositeDisposable.add(disposable2); //最后一次性全部取消订阅 compositeDisposable.dispose(); } }
RxLifecyle取消
OnDestory取消
Observable.interval(1, TimeUnit.SECONDS) .doOnDispose(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()"); } }) .compose(this.<Long>bindToLifecycle()) .subscribe(new Consumer<Long>() { @Override public void accept(Long num) throws Exception { Log.d(TAG, "accept: " + num); } });
指定生命周期取消
Observable.interval(1,TimeUnit.SECONDS) .doOnDispose(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()"); } }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "bindUntilEvent accept: " + aLong); } });
以上就是本文的全部内容,希望对大家的学习有所帮助
您可能感兴趣的文章:
- RxJava 1升级到RxJava 2过程中踩过的一些“坑”
- RxJava2.x实现定时器的实例代码
- RxJava+Retrofit+OkHttp实现多文件下载之断点续传
- 详解RxJava2 Retrofit2 网络框架简洁轻便封装
- RxJava2.x+ReTrofit2.x多线程下载文件的示例代码
- Retrofit Rxjava实现图片下载、保存并展示实例
- 深入浅出RxJava+Retrofit+OkHttp网络请求
- RxJava+Retrofit+OkHttp实现文件上传
- 浅谈RxJava处理业务异常的几种方式
- RxJava入门指南及其在Android开发中的使用示例
- 基于Retrofit2+RxJava2实现Android App自动更新
相关文章推荐
- 用递归的方式实现各种进制间的转换
- CSS居中的各种实现方式
- 大牛请进,Android音视频聊天各种实现方式问题
- 关于RXJAVA的取消订阅的理解
- AJAX各种实现方式比较
- CSS居中的各种实现方式
- AJAX各种js实现方式
- Unity视频播放的各种实现方式汇总
- kotlin for android----------MVP模式下(OKHttp和 Retrofit+RxJava)网络请求的两种实现方式
- 采用SQLServer 发布,订阅方式实现数据库同步遇到问题总结
- Linux根文件系统制作与各种挂载方式的实现
- js中各种长链接的实现方式
- 用递归的方式对于树的各种实现(前序、中序、后序、层次、节点、深度、交换左右子树)
- 混合式开发框架模式中各种登陆方式的配置实现
- 单例模式--各种实现方式详解
- 进度条实现---各种方式
- 线程系列08,实现线程锁的各种方式,使用lock,Montor,Mutex,Semaphore以及线程死锁
- SQL SERVER 2008 利用发布订阅方式实现数据库同步
- sql server 利用发布订阅方式实现数据库同步问题