您的位置:首页 > 编程语言 > Java开发

Rxjava的基础用法和源码解析(一)

2020-04-23 09:15 1046 查看

打算从这篇博客开始整理一下Observable的那些api用法

这篇博客的用法主要有 :  just , from, interval, timer, range, never, empty, error, defer

just

[code]public static <T> Observable<T> just(final T value)
public static <T> Observable<T> just(T... t1)

just会把传入的参数值数组依次输出,具体发布原理上一篇已经分析过了

[code]Observable.just(1,2,3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});

这里会依次输出1,2,3

from

[code]public static <T> Observable<T> from(T[] array)
public static <T> Observable<T> from(Future<? extends T> future)
public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit)
public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler)
public static <T> Observable<T> from(Iterable<? extends T> iterable)

方法有点多我们分别分析一下

[code]public static <T> Observable<T> from(T[] array)

这个方法和上面的just用法一致,其实just的方法就是封装成数组后调用from方法

[code]public static <T> Observable<T> from(Future<? extends T> future)

这个方法传入的是一个Future对象,就是一个线程操作对象,看下最终回调的调用方法

[code] public void call(Subscriber<? super T> subscriber) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// If the Future is already completed, "cancel" does nothing.
that.cancel(true);
}
}));
try {
...
if (subscriber.isUnsubscribed()) {
return;
}
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
subscriber.onNext(value);
subscriber.onCompleted();
} catch (Throwable e) {
...
}
}

其中that就是我们传入的future对象,这块会调用future的get方法返回相应的值,再把这个值发布到我们的subscriber中,并在最终结束后用cancel方法取消任务

注:这个方法虽然可以开辟新的线程去处理数据,但是最终回调还是会回到subscribe的线程,而且future的get的方法可能会阻塞主线程,所以就有了后面的超时参数方法

[code]public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit)

这个用法和上面一致,不同的是多了两个参数,一个是超时的值,一个是超时的时间类型;如果在超时时间内,futuer并没有获得返回值,就会抛出超时异常,也就是会直接回调subscriber的onError方法;和上面的方法一样,这个也可能会阻塞主线程

[code] ExecutorService executors = Executors.newSingleThreadExecutor();
Future<String> future = executors.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e(TAG, "start thread= " + Thread.currentThread());
//do something
Thread.sleep(2000);
return "1234";
}
});
Observable.from(future).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "end thread= " + Thread.currentThread() + " :  value= " + s);
}
});
Log.e(TAG,"back to main Thread");

输出结果是

[code]01-28 17:30:26.635  E/Main: start thread= Thread[pool-1-thread-1,5,main]
01-28 17:30:28.635  E/Main: end thread= Thread[main,5,main] :  value= 1234
01-28 17:30:28.636  E/Main: back to main Thread

可见处理分别是在两个线程,并且确实阻塞了主线程

除此之外,还有一个方法和这个类似

[code]public static <T> Observable<T> fromCallable(Callable<? extends T> func)

这个省去了创建线程的过程,直接通过callable方法返回处理后的值,和上面不同的是,这个处理的线程和subscriber要发布的是同一个线程;Future是异步处理可以监测超时,而callable则是同步处理,操作更简单

[code]public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler)

这里多了一个参数Scheduler,这个是Rx的线程概念,Rx是支持处理和发布的线程随时切换的

比如我们指定线程是Rx的io线程

[code]        Log.e(TAG,"thread0= " +Thread.currentThread());
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e(TAG,"thread1= " +Thread.currentThread());
return "1234";
}
});
Observable.from(future,Schedulers.io()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,"thread2= " +Thread.currentThread());
}
});

输出结果

[code]E/Main: thread0= Thread[main,5,main]
E/Main: thread1= Thread[pool-1-thread-1,5,main]
E/Main: thread2= Thread[RxCachedThreadScheduler-1,5,main]

我们会发现发布线程变成了Rx的cache线程,这就实现了线程切换,最终改变的发布的线程,其实

[code]public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) {
return create(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
}

这个方法就是调用了切换线程的方法

[code]public static <T> Observable<T> from(Iterable<? extends T> iterable)

这个传入的是一个迭代器对象,而实现这个借口的基本都是Collection;我们常用的List和Set都是这个范畴,用法和传入数组的基本类似

[code]  List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3));
Observable.from(list).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something
}
});
}

interval

[code]public static Observable<Long> interval(long interval, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

这个是一个定时计数的方法,看下具体内容

[code]final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}

}, initialDelay, period, unit);

这个方法会根据设置的间隔和初始依次叠加数值,从0开始无上限,一般会搭配take使用,take表示取值的最大个数

[code]public static Observable<Long> interval(long interval, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

第一个方法的interval表示间隔,unit表示时间类型

第二个方法的initalDelay表示开始计时的延迟时间,period表示间隔,unit表示时间类型(前两个参数都是用这个类型)

[code] Log.e(TAG, "start count");
Observable.interval(2, 1, TimeUnit.SECONDS).take(5).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(TAG, "result = " + aLong);
}
});

输出结果是

[code]18:10:27.603 E/Main: start count
18:10:29.619 E/Main: result = 0
18:10:30.620 E/Main: result = 1
18:10:31.619 E/Main: result = 2
18:10:32.617 E/Main: result = 3
18:10:33.618 E/Main: result = 4

所以是延迟2s后依次打印相应数据

注: 这个call回调发生在compution线程,非主线程

[code]public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

这里提供了切换线程的方法,让实际操作回调反生在我们定义的线程,比如上面的可以改为

[code]Observable.interval(2, 1, TimeUnit.SECONDS,AndroidSchedulers.mainThread()).take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//ui 操作
}
});

timer

[code]public static Observable<Long> timer(long delay, TimeUnit unit)
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)

这个用于延迟执行某些操作的方法,只会回调一次

[code] Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedule(new Action0() {
@Override
public void call() {
try {
child.onNext(0L);
} catch (Throwable t) {
Exceptions.throwOrReport(t, child);
return;
}
child.onCompleted();
}
}, time, unit);

这里返回0之后直接回调onCompletd了,也就是延迟执行任务就结束了

[code] Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//do something
}
});

这个回调也是发生在compution线程中的,所以也要通过Scheduler 去切换相应线程

range

[code]public static Observable<Integer> range(int start, int count)
public static Observable<Integer> range(int start, int count, Scheduler scheduler)

看下具体实现

[code] public static Observable<Integer> range(int start, int count) {
...
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
}

void fastpath() {
final long end = this.end + 1L;
final Subscriber<? super Integer> o = this.o;
for (long i = index; i != end; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
}
}

实际就是从start开始计数,回调count次后终止

[code] Observable.range(3, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "result= " + integer);
}
});

输出结果是

[code]E/Main: result= 3
E/Main: result= 4
E/Main: result= 5
E/Main: result= 6
E/Main: result= 7

never

[code]public static <T> Observable<T> never()

这个返回的Observable对象,同时方法内不做任何处理,相当于直接跳过此次,不做处理

[code]NeverObservable() {
super(new OnSubscribe<T>() {

@Override
public void call(Subscriber<? super T> observer) {
// do nothing
}

});
}

一般配合flatmap方法使用,具体使用场景并不多

举个例子,比如我们要打印一个数组里比3大的所有数,其他的我不想处理

[code] List<Integer> array = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6));
Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if (integer.intValue() <= 3) {
return Observable.never();
}
return Observable.just(integer);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "result= " + integer);
}
});

这里我们比3小的直接跳过,会继续后面的遍历而不影响整体流程,类比于我们的continue;输出结果是

[code]E/Main: result= 4
E/Main: result= 5
E/Main: result= 6

empty

[code] public static <T> Observable<T> empty()
[code]private static final class EmptyHolder {
final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.onCompleted();
}
});
}

这个相当于直接跳出,相当于我们用的break,直接跳出回调完成的方法

比如下面的例子

[code] Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if (integer > 3) {
return Observable.empty();
}
return Observable.just(integer);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "result= " + integer);
}
});

当发现第一个大于3的数的时候直接跳出循环,输出结果是

[code]E/Main: result= 1
E/Main: result= 2
E/Main: result= 3

所以empty会影响整体流程,后续的发布事件全部都不会再接受,直接终止

error

[code]public static <T> Observable<T> error(Throwable exception)
[code]private static class ThrowObservable<T> extends Observable<T> {
public ThrowObservable(final Throwable exception) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> observer) {
observer.onError(exception);
}
});
}
}

顾名思义,这个会直接抛出定义的异常,类似于我们平时定义的throw Exception,这个异常会直接到subscriber的onError里,同时结束整个流程(onCompleted和onError最终只会执行一个)

[code] Observable.from(array).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if (integer > 3) {
return Observable.error(new RuntimeException("larger than 3"));
}
return Observable.just(integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG,"onCompleted  ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError  "+e);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext  "+integer);
}
});

输出结果是

[code]E/Main: onNext  1
E/Main: onNext  2
E/Main: onNext  3
E/Main: onError  java.lang.RuntimeException: larger than 3

defer

[code]public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory)
[code]public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
final Func0<? extends Observable<? extends T>> observableFactory;

public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}

@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
//开始创建新的Observable对象
//call 返回的就是上面的 Observable<? extends T>
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
//开始注册subscriber
o.unsafeSubscribe(Subscribers.wrap(s));
}

}

public interface Func0<R> extends Function, Callable<R> {
@Override
R call();
}

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
//省略部分代码,类似代码输出是
//和上篇流程基本一致
...
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
} catch (Throwable e) {
...
}
return Subscriptions.unsubscribed();
}
}

我们知道只有当subscribe订阅后才会开始发布数据,Observable一般是开始就创建的;从源码可以看出来,defer这个在订阅之后才开始调用call方法创建Observable对象并开始绑定subscriber,类比的话就是我们常说的懒加载模式,使用方法也和之前的类似,只是把最初的创建挪到这个位置

[code]Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(1,2,3);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,"defRes  "+integer);
}
});

输出结果是

[code]E/Main: defRes  1
E/Main: defRes  2
E/Main: defRes  3

和前面的用法没什么差别

  • 点赞
  • 收藏
  • 分享
  • 文章举报
迷途の知归 发布了20 篇原创文章 · 获赞 1 · 访问量 3010 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: