RxJava 学习笔记
2016-12-23 14:01
288 查看
0 本文章基于以下两篇博文总结而来
流式理解 http://www.tuicool.com/articles/BBNRRf
初学建议 http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_31
1 是什么:
Reactive Programming响应式编程:对异步事件流进行监听并处理的编程方式
RxJava 一个在JavaVM上使用可观测序列来组成的异步的基于事件的程序库
2 好处:
1 简洁 链式调用
2 可以指定线程
3 更适合Android开发
3 基本概念
Observer 观察者
Observerable 被观察者
subscribe 订阅(将观察者和被观察者联系起来)
subcriber.onNext(“xxx”); 事件
3.1 首先我们需要产生事件流即被观察者
例如:
1)Observerable :事件流的创建方式(事件流是廉价的,一切皆事件流)
2)或者使用Retrofit直接调用service方法返回一个Observrable
>
注:其他的快捷创建事件的方法
1)just(T…)
流式理解 http://www.tuicool.com/articles/BBNRRf
初学建议 http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_31
1 是什么:
Reactive Programming响应式编程:对异步事件流进行监听并处理的编程方式
RxJava 一个在JavaVM上使用可观测序列来组成的异步的基于事件的程序库
2 好处:
1 简洁 链式调用
2 可以指定线程
3 更适合Android开发
3 基本概念
Observer 观察者
Observerable 被观察者
subscribe 订阅(将观察者和被观察者联系起来)
subcriber.onNext(“xxx”); 事件
3.1 首先我们需要产生事件流即被观察者
例如:
1)Observerable :事件流的创建方式(事件流是廉价的,一切皆事件流)
Observerable.create(Observerable的子类 OnSubscriber<T>{ public void call(Subcriber<? super T> subcriber){ subcriber.onNext("xxx"); } })
2)或者使用Retrofit直接调用service方法返回一个Observrable
GitHub gitHub = retrofit.create(GitHub.class); Observable<List<Contributor>> observable = gitHub.contributors("square", "retrofit");4000
>
注:其他的快捷创建事件的方法
1)just(T…)
Observable observable = Observable.just("Hello", "Hi", "Aloha");
2)from(T[])或from(iterableString[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words);
3.1.1关于事件流变换
1)map 单纯的将事件进行转换 转换后的事件会被不加处理的直接发送到Subcriber进行消费
2)flatMap 每个事件转换为一个新的事件流并激活它然后每个激活的新事件都会被汇集到Subcriber中进行消费
3)throttleFirst 在每次事件触发后的一定时间间隔内丢弃新的事件 throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为
4)lift 使用Opration进行变换不建议使用
5)compose(Transformer) 使用多个变化组成一个输入事件流返回一个事件流的Transformer,在链式调用中服用该compose
注转换时经常会使用到Func1
3.1.2flatMap和map的区别
相同的功能使用map和flatMap的实现方法
1)MAPObservable.from(studentList).map(new Func1<Student,Observable<Course>>(){ public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }).subscribe(new Action1<Observable<Course>>() { @Override public void call(Observable<Course> courseObservable) { courseObservable.subscribe(new Action1<Course>() { @Override public void call(Course course) { System.out.println(course.getName()); } }); } });
2)flatMapObservable.from(studentList).flatMap(new Func1<Student,Observable<Course>>(){ @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }).subscribe(new Action1<Course>() { @Override public void call(Course course) { System.out.println(course.getName()); } });
3.2 我们需要观察者对观察到的事件进行响应
Observer,Subscriber, Flowable的区别
共同点:他们均为观察者提供具体的事件处理方法
不同点: Observer是接口,Subscriber为该接口的抽象类,对Observer进行了扩展增加了onStart()和unsubscribe()方法
Flowable是RxJava2.0 为了解决 Backpressure问题而引入
Backpressure:事件产生的速度远远大于消费的速度并且事件是源源不断的时候的处理策略,
1.0时期Observer会缓存事件且缓存区大小为16,超过后会抛出异常,
Flowable缓存区为128且可以选择超出后的策略测试Flowable例子如下:Flowable.create(new FlowableOnSubscribe<integer>() { @Override public void subscribe(FlowableEmitter<integer> e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureMode.ERROR) //指定背压处理策略,抛出异常 .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<integer>() { @Override public void accept(Integer integer) throws Exception { Log.d("JG", integer.toString()); Thread.sleep(1000); } }, new Consumer<throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d("JG",throwable.toString()); } });
3.2.2 Action
action0 —- onCompletedAction
action1 —- onNextAction
action1 —– onError
3.3 subscribe 订阅public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }
1 调用预处理方法onStart()
2 触发事件流的事件
3 将传入的Subcriber作为Subscription返回,为了方便unsubscrib()
3.4线程控制 Scheduler* Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 * Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。 * Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 * Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 *
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
相关文章推荐
- Java服务端程序简化
- 【转】史上最全最强SpringMVC详细示例实战教程
- 【java】String s = new String("xyz")创建了几个对象
- Spring Annotations
- OutOfMemoryError: Java heap space
- java执行cmd命令工具实现
- JavaWeb--Session、Cookie
- Spring MVC 4 - 多视图输出
- struts2执行流程
- java web 登录后更新JSESSIONID
- Struts中<s:checkboxlist>的用法
- springmvc 带查询条件的分页,form的控制范围,怎么包裹条件提交给后台
- myeclipse打开后卡住
- java生成md5
- Java中的线程池--读书笔记
- jdk中用到的设计模式
- java颜色代码对照表
- struts2配置文件
- Java环境变量配置
- 利用JavaFx开发RIA桌面应用-获取屏幕四个角落的坐标