您的位置:首页 > 编程语言 > Java开发

Rxjava操作符源码分析

2017-05-17 11:40 357 查看
just():

//这里的变换 a 的值时为了说明下面的 defer() 操作符
a = 10;
Observable<Integer> justObservable = Observable.just(a);
a = 12;
justObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i("TAG", "accept: " + integer);
}
});


输出:

accept: 10

分析:

public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}


第一行是判空 , 第二行的RxjavaPlugins是为了实现hook , 下面都会忽略这两个 .

返回的是
new ObservableJust<T>(item)


public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
}


这里跳过了很多细节 , 为的只是将操作符的实现 . 具体可以自己去看源码 .

ObservableJust将我们传过来的 a 存在 ObservableJust对象当中 . 所以 just() 操作符返回的是一个 持有我们传过去的 a 的ObservableJust对象 . 当订阅之后会执行相应的方法 .

defer():

a = 10;
Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(a);
}
});
a = 12;
deferObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i("TAG", "accept: " + integer);
}
});


输出:

accept: 12

分析:

defer() 中传过去的是一个callable对象 .

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}


public final class ObservableDefer<T> extends Observable<T> {
final Callable<? extends ObservableSource<? extends T>> supplier;
public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {
this.supplier = supplier;
}
}


返回了ObservableDefer对象 , 存入了callable .

@Override
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}

pub.subscribe(s);
}


当订阅方法执行的时候 , 会调用 callable的call() 方法 获取我们在call()方法中返回的 ObservableSource , 接着调用 ObservableSource 的 subscribe()方法 , 相继调用onNext()等方法 , defer()方法是懒加载创建被观察者 , 所以在 a 变换之后 , 返回的也是变换过后的 a .

fromArray():

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}


fromArray()方法 , 如果是空的 , 返回一个空的被观察者 , 如果只有一个 , 那么调用 Observable.just()方法 , 如果长度 > 1 返回一个ObservableFromArray对象 .

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
}


ObservableFromArray 引用者数组 .

public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

s.onSubscribe(d);

if (d.fusionMode) {
return;
}

d.run();
}


订阅之后 , 会调用FromArrayDisposable的 run()方法

void run() {
//我们传过来的数组
T[] a = array;
int n = a.length;

for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}


这里利用 for 循环 , 获取值 , 并且调用onNext()方法 , 如果值为null , 停止循环并且调用onError()方法 .

fromIterable() 也一样 , 是利用 Iterator 来遍历 .

interval():

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}


interval()有很多重载 , 最终调用上面这个方法 . 返回的是ObservableInterval对象 , 默认传过去的Schedulars.computation线程 ,

@Override
public void subscribeActual(Observer<? super Long> s) {
IntervalObserver is = new IntervalObserver(s);
s.onSubscribe(is);

Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit);

is.setResource(d);
}


订阅之后会调用 , scheduler的计时器方法 .

@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
actual.onNext(count++);
}
}


会按照时间 , 去调用这个方法 .

range():

Observable.range(1 , 4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i("TAG", "accept: " + integer);
}
});


输出:

accept: 1

accept: 2

accept: 3

accept: 4

分析:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable<Integer> range(final int start, final int count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
if (count == 0) {
return empty();
}
if (count == 1) {
return just(start);
}
if ((long)start + (count - 1) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Integer overflow");
}
return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
}


数量为0 , 返回空的被观察者 , 等于1 , 返回just() , 当大于1时 , 返回ObservableRange()对象 .

@Override
protected void subscribeActual(Observer<? super Integer> o) {
RangeDisposable parent = new RangeDisposable(o, start, end);
o.onSubscribe(parent);
parent.run();
}


订阅之后调用RangeDisposable 的run()方法 .

void run() {
if (fused) {
return;
}
Observer<? super Integer> actual = this.actual;
//index为start , end是 start+count
long e = end;
for (long i = index; i != e && get() == 0; i++) {
actual.onNext((int)i);
}
if (get() == 0) {
lazySet(1);
actual.onComplete();
}
}


这里也是利用 for 循环来调用onNext()方法 .

repeat():

RxJava将这个操作符实现为repeat方法。它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数。

Observable.just("111","222" , "333").repeat(2)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i("TAG", "accept: " + s);
}
});


输出:

accept: 111

accept: 222

accept: 333

accept: 111

accept: 222

accept: 333

分析:

public final Observable<T> repeat(long times) {
if (times < 0) {
throw new IllegalArgumentException("times >= 0 required but it was " + times);
}
if (times == 0) {
return empty();
}
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}


这里要了解
new ObservableRepeat<T>(this, times)
中的 this , 实际指的是他的第一个上游 , 也就是Observable.just()返回过来的被观察者 .

public ObservableRepeat(Observable<T> source, long count) {
super(source);
this.count = count;
}


这里调用父类构造方法 , 就是把上面传过来的this 存到了ObservableRepeat对象当中 .

@Override
public void subscribeActual(Observer<? super T> s) {
SequentialDisposable sd = new SequentialDisposable();
s.onSubscribe(sd);

RepeatObserver<T> rs = new RepeatObserver<T>(s, count != Long.MAX_VALUE ? count - 1 : Long.MAX_VALUE, sd, source);
rs.subscribeNext();
}


当订阅之后调用 RepeatObserver的 subscribeNext() 方法 .

void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
if (sd.isDisposed()) {
return;
}
//①
source.subscribe(this);

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}


在①处的source就是第一个上游 , Observable.just()对象(简称just对象) , 调用了它的subscribe()方法 , 并且传过去this , 表明 该类订阅了just对象 . 当just对象 , 完成所有任务后 , 他会调用订阅者的(该类)的onComplete()方法 .

@Override
public void onComplete() {
//①
long r = remaining;
if (r != Long.MAX_VALUE) {
remaining = r - 1;
}
//②
if (r != 0L) {
subscribeNext();
} else {
actual.onComplete();
}
}


① : remaining 是 repeat()传过来的次数 . 每次执行onComplete时 , 将remaining减一 .

② : 当 r 不等于0时 接着调用subscribeNext()方法 , 等于 0 时在调用onComplete()方法 .

startWith():

Observable.just("111" , "222" , "333" , "444")
.startWith("321")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i("TAG", "accept: " + s);
}
});


输出:

accept: 321

accept: 111

accept: 222

accept: 333

accept: 444

分析:

public final Observable<T> startWith(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return concatArray(just(item), this);
}


this 指的是ObservableFromArray对象 , 调用了concatArray() 方法 , 将 ObservableFromArray 对象和 ObservableJust 对象传过去 . 从方法名上看 , 是将两个Array结合 .

public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return wrap((ObservableSource<T>)sources[0]);
}
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
}


返回一个ObservableConcatMap对象

public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {

public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.bufferSize = Math.max(8, bufferSize);
}
}


执行订阅方法();

@Override
public void subscribeActual(Observer<? super U> s) {

if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}

if (delayErrors == ErrorMode.IMMEDIATE) {
SerializedObserver<U> serial = new SerializedObserver<U>(s);
source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
} else {
source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
}
}


source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));


这里走第二个分支 , source实际为
ObservableFromArray
对象 , 首先调用它的onSubscribe()方法 .

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//①
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

//②
int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;

actual.onSubscribe(this);
//③
drain();
return;
}

if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;

actual.onSubscribe(this);

return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}


① : s 实际为
FromArrayDisposable
继承与
QueueDisposable
.

② : 接着调用它的 requestFusion() 合成模式为ANY , 具体看下面

③ : drain() 方法为核心 , 这里遍历了所有元素

@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}


上面是FromArrayDisposable的requestFusion()方法 , 返回SYNC . 并且在 fusionMode 置为true .

@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

s.onSubscribe(d);

//①
if (d.fusionMode) {
return;
}

d.run();
}


① : 因为他是合成的 , 所以直接return , 不走run() 方法 , 所以可以知道合成之后 , 并不是在这里执行onNext()等方法 .

void drain() {
if (getAndIncrement() != 0) {
return;
}

Observer<? super R> actual = this.actual;
SimpleQueue<T> queue = this.queue;
AtomicThrowable error = this.error;

for (;;) {

if (!active) {

if (cancelled) {
queue.clear();
return;
}

if (!tillTheEnd) {
Throwable ex = error.get();
if (ex != null) {
queue.clear();

actual.onError(error.terminate());
return;
}
}

boolean d = done;

T v;

try {
//①
v = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
this.d.dispose();
error.addThrowable(ex);
actual.onError(error.terminate());
return;
}

boolean empty = v == null;

//②
if (d && empty) {
Throwable ex = error.terminate();
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
return;
}

if (!empty) {

ObservableSource<? extends R> o;

try {
//③
o = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
this.d.dispose();
queue.clear();
error.addThrowable(ex);
actual.onError(error.terminate());
return;
}

if (o instanceof Callable) {
R w;

try {
w = ((Callable<R>)o).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
error.addThrowable(ex);
continue;
}

if (w != null && !cancelled) {
//④
actual.onNext(w);
}
continue;
} else {
active = true;
//⑤
o.subscribe(observer);
}
}
}

if (decrementAndGet() == 0) {
break;
}
}
}


① : 从队列中拉取一个元素 ,
FromArrayDisposable
调用的是这个对象的方法()

② : 判断从队列中拉取的对象是否为空 , 如果为空则执行onComplete()方法 .

③ : 转换类型 , 这个例子中返回的是当前数据

④ , ⑤ : 调用拉取元素的onNext() , 或者 订阅 () , 方法 .

这里之所以上一层是
ObservableFromArray
对象 , 是因为他操作了在创建
ObservableConcatMap
对象之前 , 调用fromArray() 方法 , 将just()对象以及()fromArray()对象融合在一起 .

变换操作符

map():

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}


用上游的被观察者 , 以及mapper(变换规则) , 创建ObservableMap对象 .

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}


@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}


map的订阅方法就一行 , 用MapObserver对象订阅了上游的被观察者 .

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != NONE) {
actual.onNext(null);
return;
}

U v;

try {
//①
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}


这里利用mapper , 将 t 转变成 v , 并调用下游订阅者的onNext()方法 .

buffer():

Observable.just("111" , "222" , "333" , "444")
.buffer(3)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> strings) throws Exception {
Log.i("TAG", "accept: " + strings);
}
});


输出:

accept: [111, 222, 333]

accept: [444]

分析:

public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
ObjectHelper.verifyPositive(count, "count");
ObjectHelper.verifyPositive(skip, "skip");
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));
}


buffer() 简单的buffer , 调用这个方法 , 当调用一个参数的方法时 , skip = count . bufferSupplier 提供缓冲区功能 , 默认每次创建缓冲区都提供new ArrayList();

public ObservableBuffer(ObservableSource<T> source, int count, int skip, Callable<U> bufferSupplier) {
super(source);
this.count = count;
this.skip = skip;
this.bufferSupplier = bufferSupplier;
}


创建一个ObservableBuffer对象 . 当订阅之后 .

@Override
protected void subscribeActual(Observer<? super U> t) {

if (skip == count) {
//①
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
//②
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}


先分析 skip 和 count相等的情况

① : 创建一个BufferExactObserver , 带有缓冲区的观察者 .

② : 利用 bufferSupplier. call() 创建一个 List 实际为 ArrayList . 给buffer赋值

boolean createBuffer() {
U b;
try {
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "Empty buffer supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
buffer = null;
if (s == null) {
EmptyDisposable.error(t, actual);
} else {
s.dispose();
actual.onError(t);
}
return false;
}

buffer = b;

return true;
}


创建完缓冲区之后 , 订阅了上游的被观察者 .

@Override
public void onNext(T t) {
U b = buffer;
if (b != null) {
b.add(t);

if (++size >= count) {
actual.onNext(b);

size = 0;
createBuffer();
}
}
}


每次上游的订阅者调用当前的onNext()方法 , 会将元素装进缓冲区 , 并且判断当 ++size >= count 时 , 调用下游的onNext()方法 , 将缓冲后的数据传输过去 . 并且将size置为0 , 接着创建缓冲区 , 循环直到结束 . .

当 count > skip

Observable.just("111" , "222" , "333" , "444")
.buffer(3 , 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> strings) throws Exception {
Log.i("TAG", "accept: " + strings);
}
});


输出 :

accept: [111, 222, 333]

accept: [333, 444]

当 count < skip

Observable.just("111" , "222" , "333" , "444")
.buffer(2 , 3)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> strings) throws Exception {
Log.i("TAG", "accept: " + strings);
}
});


输出 :

accept: [111, 222]

accept: [444]

@Override
protected void subscribeActual(Observer<? super U> t) {

if (skip == count) {
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else { //①
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}


① : 这回看一下当 skip 和 count 不同时的情况 .

BufferSkipObserver观察者订阅了上游的被观察者 .

@Override
public void onNext(T t) {
//①
if (index++ % skip == 0) {
U b;

try {
//②
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
buffers.clear();
s.dispose();
actual.onError(e);
return;
}
//③
buffers.offer(b);
}
//④
Iterator<U> it = buffers.iterator();
while (it.hasNext()) {
U b = it.next();
b.add(t);
//⑤
if (count <= b.size()) {
it.remove();
//⑥
actual.onNext(b);
}
}
}


① : index默认为0 , 持续++操作 . 每当过了 skip 的数量后 ,

② : 利用bufferSupplier.call()方法 , new ArrayList() .

③ : buffers是一个ArrayDequeue类型 , 把全新的List插入进buffers当中 .

④ : 遍历buffers , 给每个List添加元素 .

⑤ : 当count <= List的size时 , 将该List移除 ,

⑥ : 调用下游观察者的onNext()方法 , 传入缓存好的list

得出结论 : skip 决定 创建buffer(这里是ArrayList)的时机 , count 决定 下游观察者获取的List大小

scan():

Observable.just("111" , "222" , "333" , "444")
.scan(new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i("TAG", "accept: " + s);
}
});


输出:

I: accept: 111

I: accept: 111222

I: accept: 111222333

I: accept: 111222333444

其实和map() 差不多 , 不分析啦

过滤操作

debounce():

仅在过了一段指定的时间还没发射数据时才发射一个数据 .

这里会根据 rxbinding 结合解析 .

Observable.just("111" , "222" , "333")
.debounce(2000 , TimeUnit.MILLISECONDS , Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i("TAG", "accept: " + s);
Log.i("TAG", "accept: " + Thread.currentThread().getName());
}
});


输出:

I: accept: 333

I: accept: main

这个是立即输出的 , 并且是输出在主线程 . 这和我们一般用的RxViews 不大一样 .

分析:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit) {
//①
return debounce(timeout, unit, Schedulers.computation());
}


这里调用三个参数的debounce()方法 , 并且传入的是Schedulers.computation() .

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<T>(this, timeout, unit, scheduler));
}


返回的是ObservableDebounceTimed对象

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DebounceTimedObserver<T>(
new SerializedObserver<T>(t),
timeout, unit, scheduler.createWorker()));
}


new了DebounceTimedObserver观察者订阅了上游被观察者(ObservableFromArray) . 订阅之后上游观察者调用DebounceTimedObserver的onNext() .

@Override
public void onNext(T t) {
if (done) {
return;
}

long idx = index + 1;
index = idx;
//①
Disposable d = timer.get();
if (d != null) {
//②
d.dispose();
}

//③
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
//④
if (timer.compareAndSet(d, de)) {
//⑤
d = worker.schedule(de, timeout, unit);

de.setResource(d);
}

}


首先要明确该例子上游的被观察者是
ObservableFromArray
对象 , 它会遍历调用onNext后 , 最后调用onComplete方法 .

① : timer 是 AtomicReference<Disposable>类型 .

② : 如果 d 不为空 , 调用dispose()方法

③ : DebounceEmitter实际作用可以看做是Runnable 对象 , 在该对象中继续调用下游观察者的onNext()等方法

④ : 给timer赋值 , 也就是每次onNext都会将原来的 runnable对象给dispose()掉 .

⑤ : 利用线程池执行延时操作 , 该例子是延时2秒执行
DebounceEmitter
.

在看一下
DebounceEmitter
的run()方法

@Override
public void run() {
//①
if (once.compareAndSet(false, true)) {
//②
parent.emit(idx, value, this);
}
}


① : 重点 once 是 AtomicBoolean类型的成员变量 , 有哪个数据流已经调用过run() 方法了 , 那么不会调用emit() 方法 .

② : parent 实际为
DebounceTimedObserver
对象

void emit(long idx, T t, DebounceEmitter<T> emitter) {
if (idx == index) {
//②
actual.onNext(t);
emitter.dispose();
}
}


① : 利用idx 防止多次执行 下游的onNext()方法 .

到这里可能还有疑惑 , 接着看一下onComplete()方法 .

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
//①
Disposable d = timer.get();
if (d != DisposableHelper.DISPOSED) {
@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
if (de != null) {
//②
de.run();
}
actual.onComplete();
//③
worker.dispose();
}
}


① : 从timer 中获取最新的Disposable .

② : 这里调用了 run()方法 , 并且这是在主线程 .

③ : dispose掉线程 .

他迅速执行完onNext() , 并且执行了onComplete()方法 , 所以他并不会延迟2秒后给我发消息 , 而且是主线程 . 接下来看看rxbinding是如何实现延迟给发送消息的 .

RxView.clicks(findViewById(R.id.create)).debounce(2000 , TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
Log.i("TAG", "accept: " + Thread.currentThread().getName());
}
});


输出 : I: accept: RxComputationThreadPool-1(两秒后)

分析 :

@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}


Listener 中的onClick()方法 .

@Override public void onClick(View v) {
if (!isDisposed()) {
//①
observer.onNext(Notification.INSTANCE);
}
}


① : observer是下游的观察者 , 也就是
DebounceTimedObserver
对象 , 这里只调用了onNext() , 没有调用onComplete()方法 , 所以他并不会在主线程执行 上面分析的 run()方法 , 而是在子线程延时发送事件 .

distinct():

这个比较简单 , 内部用HashSet()去重 , 也可以根据自定义key来去重 .

skip():

public final Observable<T> skip(long count) {
if (count <= 0) {
return RxJavaPlugins.onAssembly(this);
}
return RxJavaPlugins.onAssembly(new ObservableSkip<T>(this, count));
}


返回一个
ObservableSkip
对象 ,

@Override
public void subscribeActual(Observer<? super T> s) {
source.subscribe(new SkipObserver<T>(s, n));
}


SkipObserver订阅了上游的被观察者 .

@Override
public void onNext(T t) {
//①
if (remaining != 0L) {
remaining--;
} else {
actual.onNext(t);
}
}


①remaining是我们传的skip()参数 , 他这里会调用 – , 并且来跳过元素 . 当remaining == 0 时执行下游观察者的onNext()方法 .

skipLast():

public final Observable<T> skipLast(int count) {
if (count < 0) {
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
}
if (count == 0) {
return RxJavaPlugins.onAssembly(this);
}
return RxJavaPlugins.onAssembly(new ObservableSkipLast<T>(this, count));
}


返回
ObservableSkipLast
对象 .

@Override
public void subscribeActual(Observer<? super T> s) {
source.subscribe(new SkipLastObserver<T>(s, skip));
}


SkipLastObserver订阅了上游的被观察者 .

@Override
public void onNext(T t) {
//①
if (skip == size()) {
//②
actual.onNext(poll());
}
//③
offer(t);
}


① : SkipLastObserver继承了ArrayDeque , 是一个队列 , skip()为传过来的参数 .

② : 从队列中抽出第一个元素调用下游观察者的onNext()方法 , 每次抽出元素后size()会 -1

③ : 每次都会给队列插入元素 , 插入后size()+1 .

skip() 和 skipLast() 为相对的作用 , 但处理时skipLast()是用队列来处理 .

reduce():

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ObservableReduceMaybe<T>(this, reducer));
}


@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new ReduceObserver<T>(observer, reducer));
}


套路都一样..

@Override
public void onNext(T value) {
if (!done) {
T v = this.value;
//①
if (v == null) {
this.value = value;
} else {
try {
//②
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
}


① : 判断 this.value 是否为空 , 第一次onNext()肯定是空的 , 所以第一次不走reducer .

② : 从第二次onNext()开始会调用 , 自己实现的reducer() , 并给 this.value 赋值 , 下次的reducer的参数就是这一次的 reducer返回的值 , 就一直这样直到onComplete()

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
T v = value;
value = null;
//①
if (v != null) {
actual.onSuccess(v);
} else {
actual.onComplete();
}
}


① : 在onComplete() 时调用 , 咱们的观察者 , 这里onSuccess和是Rxjava2中新添的观察者
MaybeObserver
, 具体可以看下更新 .

这里scan()和reduce()比较类似 , 但是还是有不一样的地方 , 数据处理的地方 , 还有reduce成功后只掉一次观察者的方法 , scan是每次都调用 .

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
T v = value;
value = null;
if (v != null) {
actual.onSuccess(v);
} else {
actual.onComplete();
}
}


结合操作

merge():

concat():

两个都是合并数据源的操作 , concat保证顺序 , merge不保证顺序 .

zip():

combineLatest():

两个都是将被观察者合并 , zip是逐个合并 , combineLatest()是最后一个元素依次合并(不知道这个说法对不对 , 大家可以去多尝试)

先码到这里 , 待续…
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: