Rxjava的基础用法和源码解析(一)
打算从这篇博客开始整理一下Observable的那些api用法
这篇博客的用法主要有 : just , from, interval, timer, range, never, empty, error, defer
just
[code]public static <T> Observable<T> just(final T value) public static <T> Observable<T> just(T... t1)
just会把传入的参数值数组依次输出,具体发布原理上一篇已经分析过了
[code]Observable.just(1,2,3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { } });
这里会依次输出1,2,3
from
[code]public static <T> Observable<T> from(T[] array) public static <T> Observable<T> from(Future<? extends T> future) public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit) public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) public static <T> Observable<T> from(Iterable<? extends T> iterable)
方法有点多我们分别分析一下
[code]public static <T> Observable<T> from(T[] array)
这个方法和上面的just用法一致,其实just的方法就是封装成数组后调用from方法
[code]public static <T> Observable<T> from(Future<? extends T> future)
这个方法传入的是一个Future对象,就是一个线程操作对象,看下最终回调的调用方法
[code] public void call(Subscriber<? super T> subscriber) { subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { // If the Future is already completed, "cancel" does nothing. that.cancel(true); } })); try { ... if (subscriber.isUnsubscribed()) { return; } T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit); subscriber.onNext(value); subscriber.onCompleted(); } catch (Throwable e) { ... } }
其中that就是我们传入的future对象,这块会调用future的get方法返回相应的值,再把这个值发布到我们的subscriber中,并在最终结束后用cancel方法取消任务
注:这个方法虽然可以开辟新的线程去处理数据,但是最终回调还是会回到subscribe的线程,而且future的get的方法可能会阻塞主线程,所以就有了后面的超时参数方法
[code]public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit)
这个用法和上面一致,不同的是多了两个参数,一个是超时的值,一个是超时的时间类型;如果在超时时间内,futuer并没有获得返回值,就会抛出超时异常,也就是会直接回调subscriber的onError方法;和上面的方法一样,这个也可能会阻塞主线程
[code] ExecutorService executors = Executors.newSingleThreadExecutor(); Future<String> future = executors.submit(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "start thread= " + Thread.currentThread()); //do something Thread.sleep(2000); return "1234"; } }); Observable.from(future).subscribe(new Action1<String>() { @Override public void call(String s) { Log.e(TAG, "end thread= " + Thread.currentThread() + " : value= " + s); } }); Log.e(TAG,"back to main Thread");
输出结果是
[code]01-28 17:30:26.635 E/Main: start thread= Thread[pool-1-thread-1,5,main] 01-28 17:30:28.635 E/Main: end thread= Thread[main,5,main] : value= 1234 01-28 17:30:28.636 E/Main: back to main Thread
可见处理分别是在两个线程,并且确实阻塞了主线程
除此之外,还有一个方法和这个类似
[code]public static <T> Observable<T> fromCallable(Callable<? extends T> func)
这个省去了创建线程的过程,直接通过callable方法返回处理后的值,和上面不同的是,这个处理的线程和subscriber要发布的是同一个线程;Future是异步处理可以监测超时,而callable则是同步处理,操作更简单
[code]public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler)
这里多了一个参数Scheduler,这个是Rx的线程概念,Rx是支持处理和发布的线程随时切换的
比如我们指定线程是Rx的io线程
[code] Log.e(TAG,"thread0= " +Thread.currentThread()); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG,"thread1= " +Thread.currentThread()); return "1234"; } }); Observable.from(future,Schedulers.io()).subscribe(new Action1<String>() { @Override public void call(String s) { Log.e(TAG,"thread2= " +Thread.currentThread()); } });
输出结果
[code]E/Main: thread0= Thread[main,5,main] E/Main: thread1= Thread[pool-1-thread-1,5,main] E/Main: thread2= Thread[RxCachedThreadScheduler-1,5,main]
我们会发现发布线程变成了Rx的cache线程,这就实现了线程切换,最终改变的发布的线程,其实
[code]public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) { return create(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); }
这个方法就是调用了切换线程的方法
[code]public static <T> Observable<T> from(Iterable<? extends T> iterable)
这个传入的是一个迭代器对象,而实现这个借口的基本都是Collection;我们常用的List和Set都是这个范畴,用法和传入数组的基本类似
[code] List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3)); Observable.from(list).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something } }); }
interval
[code]public static Observable<Long> interval(long interval, TimeUnit unit) public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
这个是一个定时计数的方法,看下具体内容
[code]final Worker worker = scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; @Override public void call() { try { child.onNext(counter++); } catch (Throwable e) { try { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, child); } } } }, initialDelay, period, unit);
这个方法会根据设置的间隔和初始依次叠加数值,从0开始无上限,一般会搭配take使用,take表示取值的最大个数
[code]public static Observable<Long> interval(long interval, TimeUnit unit) public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
第一个方法的interval表示间隔,unit表示时间类型
第二个方法的initalDelay表示开始计时的延迟时间,period表示间隔,unit表示时间类型(前两个参数都是用这个类型)
[code] Log.e(TAG, "start count"); Observable.interval(2, 1, TimeUnit.SECONDS).take(5).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.e(TAG, "result = " + aLong); } });
输出结果是
[code]18:10:27.603 E/Main: start count 18:10:29.619 E/Main: result = 0 18:10:30.620 E/Main: result = 1 18:10:31.619 E/Main: result = 2 18:10:32.617 E/Main: result = 3 18:10:33.618 E/Main: result = 4
所以是延迟2s后依次打印相应数据
注: 这个call回调发生在compution线程,非主线程
[code]public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
这里提供了切换线程的方法,让实际操作回调反生在我们定义的线程,比如上面的可以改为
[code]Observable.interval(2, 1, TimeUnit.SECONDS,AndroidSchedulers.mainThread()).take(5) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //ui 操作 } });
timer
[code]public static Observable<Long> timer(long delay, TimeUnit unit) public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
这个用于延迟执行某些操作的方法,只会回调一次
[code] Worker worker = scheduler.createWorker(); child.add(worker); worker.schedule(new Action0() { @Override public void call() { try { child.onNext(0L); } catch (Throwable t) { Exceptions.throwOrReport(t, child); return; } child.onCompleted(); } }, time, unit);
这里返回0之后直接回调onCompletd了,也就是延迟执行任务就结束了
[code] Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //do something } });
这个回调也是发生在compution线程中的,所以也要通过Scheduler 去切换相应线程
range
[code]public static Observable<Integer> range(int start, int count) public static Observable<Integer> range(int start, int count, Scheduler scheduler)
看下具体实现
[code] public static Observable<Integer> range(int start, int count) { ... return Observable.create(new OnSubscribeRange(start, start + (count - 1))); } void fastpath() { final long end = this.end + 1L; final Subscriber<? super Integer> o = this.o; for (long i = index; i != end; i++) { if (o.isUnsubscribed()) { return; } o.onNext((int) i); } if (!o.isUnsubscribed()) { o.onCompleted(); } }
实际就是从start开始计数,回调count次后终止
[code] Observable.range(3, 5).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.e(TAG, "result= " + integer); } });
输出结果是
[code]E/Main: result= 3 E/Main: result= 4 E/Main: result= 5 E/Main: result= 6 E/Main: result= 7
never
[code]public static <T> Observable<T> never()
这个返回的Observable对象,同时方法内不做任何处理,相当于直接跳过此次,不做处理
[code]NeverObservable() { super(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> observer) { // do nothing } }); }
一般配合flatmap方法使用,具体使用场景并不多
举个例子,比如我们要打印一个数组里比3大的所有数,其他的我不想处理
[code] List<Integer> array = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6)); Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { if (integer.intValue() <= 3) { return Observable.never(); } return Observable.just(integer); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.e(TAG, "result= " + integer); } });
这里我们比3小的直接跳过,会继续后面的遍历而不影响整体流程,类比于我们的continue;输出结果是
[code]E/Main: result= 4 E/Main: result= 5 E/Main: result= 6
empty
[code] public static <T> Observable<T> empty()
[code]private static final class EmptyHolder { final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { subscriber.onCompleted(); } }); }
这个相当于直接跳出,相当于我们用的break,直接跳出回调完成的方法
比如下面的例子
[code] Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { if (integer > 3) { return Observable.empty(); } return Observable.just(integer); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.e(TAG, "result= " + integer); } });
当发现第一个大于3的数的时候直接跳出循环,输出结果是
[code]E/Main: result= 1 E/Main: result= 2 E/Main: result= 3
所以empty会影响整体流程,后续的发布事件全部都不会再接受,直接终止
error
[code]public static <T> Observable<T> error(Throwable exception)
[code]private static class ThrowObservable<T> extends Observable<T> { public ThrowObservable(final Throwable exception) { super(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> observer) { observer.onError(exception); } }); } }
顾名思义,这个会直接抛出定义的异常,类似于我们平时定义的throw Exception,这个异常会直接到subscriber的onError里,同时结束整个流程(onCompleted和onError最终只会执行一个)
[code] Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { if (integer > 3) { return Observable.error(new RuntimeException("larger than 3")); } return Observable.just(integer); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.e(TAG,"onCompleted "); } @Override public void onError(Throwable e) { Log.e(TAG,"onError "+e); } @Override public void onNext(Integer integer) { Log.e(TAG,"onNext "+integer); } });
输出结果是
[code]E/Main: onNext 1 E/Main: onNext 2 E/Main: onNext 3 E/Main: onError java.lang.RuntimeException: larger than 3
defer
[code]public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory)
[code]public final class OnSubscribeDefer<T> implements OnSubscribe<T> { final Func0<? extends Observable<? extends T>> observableFactory; public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) { this.observableFactory = observableFactory; } @Override public void call(final Subscriber<? super T> s) { Observable<? extends T> o; try { //开始创建新的Observable对象 //call 返回的就是上面的 Observable<? extends T> o = observableFactory.call(); } catch (Throwable t) { Exceptions.throwOrReport(t, s); return; } //开始注册subscriber o.unsafeSubscribe(Subscribers.wrap(s)); } } public interface Func0<R> extends Function, Callable<R> { @Override R call(); } public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { //省略部分代码,类似代码输出是 //和上篇流程基本一致 ... subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } catch (Throwable e) { ... } return Subscriptions.unsubscribed(); } }
我们知道只有当subscribe订阅后才会开始发布数据,Observable一般是开始就创建的;从源码可以看出来,defer这个在订阅之后才开始调用call方法创建Observable对象并开始绑定subscriber,类比的话就是我们常说的懒加载模式,使用方法也和之前的类似,只是把最初的创建挪到这个位置
[code]Observable.defer(new Func0<Observable<Integer>>() { @Override public Observable<Integer> call() { return Observable.just(1,2,3); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.e(TAG,"defRes "+integer); } });
输出结果是
[code]E/Main: defRes 1 E/Main: defRes 2 E/Main: defRes 3
和前面的用法没什么差别
- 点赞
- 收藏
- 分享
- 文章举报
- Rxjava的基础用法和源码解析(三)
- Rxjava的基础用法和源码解析(二)
- TagView的用法及源码解析
- 1.2.Spring源码解析——容器的基础XmlBeanFactory
- Rxjava2 源码解析 (三)
- java基础解析系列(十)---ArrayList和LinkedList源码及使用分析
- RxJava2_map操作符源码解析
- 泛型基础用法解析
- 深入Java基础(三)--集合(2)ArrayList和其继承树源码解析以及其注意事项
- GeoServer源码解析和扩展 (一)基础篇
- RxJava2源码解析
- 大数据spark蘑菇云行动前传第5课:零基础彻底实战Scala函数式编程及Spark源码解析
- GeoServer源码解析和扩展 (一)基础篇
- 深入解析RxJava源码(一)Observable对象的构建
- 关于ListView (2)——SimpleAdapter源码解析与ViewBinders的用法
- 解析Java中PriorityQueue优先级队列结构的源码及用法
- Android源码基础解析之Dialog取消绘制流程
- java基础类型源码解析之HashMap
- Java基础之static关键字解析及用法学习总结
- SplashView的用法及源码解析