您的位置:首页 > 编程语言 > Java开发

Rxjava(结合类)-Merge

2016-11-01 12:41 190 查看
合并多个Observables的发射物, Merge
可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat
不会让数 据交错,它会按顺序一个接着一个发射多个Observables的发射物



看两个demo

Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
System.out.println("start 1");
int i = 5;
while (i > 0) {
subscriber.onNext(i);
Thread.sleep(800);
i--;
}
System.out.println("onCompleted");

subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).observeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
int i = 95;
System.out.println("start 2");
while (i > 90) {
subscriber.onNext(i);

Thread.sleep(600);
i--;
}
System.out.println("onCompleted2");

subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).observeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
输出:

start 1
5
4
3
2
1
onCompleted
start 2
95
94
93
92
91
onCompleted2

demo2
我们把observeOn换成subscribeOn

Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
System.out.println("start 1");
int i = 5;
while (i > 0) {
subscriber.onNext(i);
Thread.sleep(800);
i--;
}
System.out.println("onCompleted");

subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
int i = 95;
System.out.println("start 2");
while (i > 90) {
subscriber.onNext(i);

Thread.sleep(600);
i--;
}
System.out.println("onCompleted2");

subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});输出
start 2
start 1
95
5
94
4
93
3
92
91
2
onCompleted2
1
onCompleted

可以看到,当我们merge的两个observable是在同一线程 顺序执行时,输出也是按顺序的,当我们两个Observable可以并行执行时,则按他们的发射顺序打印,当然,我们也可以用timer进行来进行发射
我们看下merge的实现

public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(new Observable[] { t1, t2 });
}
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}
先看下from
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}这里创建了一个OnSubscribeFromArray
继续分析merge

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}调用lift,传递的operator是OperatorMerge,source是前面创建的Observable(OnSubscribeFromArray)
然后订阅的时候会调用OnSubscribeLift的call

public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}

这里的operatro是OperatorMerge
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;

child.add(subscriber);
child.setProducer(producer);

return subscriber;
}

返回了一个MergeSubscriber,并创建了MergeProducer
回到前面的call,这里的parent是OnSubscribeFromArray

public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}创建一个FromArrayProducer,setProducer时候会调用它的request方法
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}调用fastPath
void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}array里面的是我们merge函数里面创建的Observable
这里的child是前面创建的MergeSubscriber

调用它的onNext

public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
这里最终会创建InnerSubscriber
调用unsafeSubscribe,最终调用t(OperatorSubscribeOn)的call

public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
}这里调用schedule,最终会启动一个线程调用这里的call函数,回到fastPath,继续处理merge中的下一个Observable

然后当上面schedule的线程起来后,调用这里的call

这里又定义了一个Subscriber,然后调用source订阅它,这里的source是我们merge中subscribeOn之前的Observable,这里就会调用到我们demo中

public void call(Subscriber<? super Integer> subscriber) {
try {
System.out.println("thread ID:" + Thread.currentThread());
int i = 5;
while (i > 0) {
subscriber.onNext(i);
Thread.sleep(8000);
i--;
}
System.out.println("onCompleted");

subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
这里传递的subscriber是刚刚schedule创建的s,调用它的onNext

public void onNext(T t) {
subscriber.onNext(t);
}

这里的subscriber是InnerSubscriber,调用它的onNext
@Override
public void onNext(T t) {
parent.tryEmit(this, t);
}这里的parent是MergeSubscriber,调用它的tryEmit
void tryEmit(InnerSubscriber<T> subscriber, T value) {
boolean success = false;
long r = producer.get();
if (r != 0L) {
synchronized (this) {
// if nobody is emitting and child has available requests
r = producer.get();
if (!emitting && r != 0L) {
emitting = true;
success = true;
}
}
}
if (success) {
RxRingBuffer subscriberQueue = subscriber.queue;
if (subscriberQueue == null || subscriberQueue.isEmpty()) {
emitScalar(subscriber, value, r);
} else {
queueScalar(subscriber, value);
emitLoop();
}
} else {
queueScalar(subscriber, value);
emit();
}
}调用emitScalar
protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
boolean skipFinal = false;
try {
try {
child.onNext(value);
} catch (Throwable t) {
if (!delayErrors) {
Exceptions.throwIfFatal(t);
skipFinal = true;
subscriber.unsubscribe();
subscriber.onError(t);
return;
}
getOrCreateErrorQueue().offer(t);
}
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
subscriber.requestMore(1);
// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
if (!missed) {
emitting = false;
return;
}
missed = false;
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
/*
* In the synchronized block below request(1) we check
* if there was a concurrent emission attempt and if there was,
* we stay in emission mode and enter the emission loop
* which will take care all the queued up state and
* emission possibilities.
*/
emitLoop();
}

child.onNext(value);最终会调用到我们的订阅者。
同样,当另一个线程启动的时候也会回调call,最终调用到我们merge中设置的回调,所以两个merge中的Observable可能会交错发射。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: