您的位置:首页 > 编程语言 > Java开发

RxJava的Observable创建和发布流程原理简单分析

2020-04-23 09:15 1361 查看

Rx框架从发布以来已经被越来越多的人所认可,尤其是其类型转换和线程自由切换可以大大减少代码的回调层数,使代码可读性大大提高

我们先看一个例子

[code]  Observable.just("1")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
System.out.println("first ->>> " + Thread.currentThread().getId());
return new Integer(s);
}
})
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("second ->>> " + Thread.currentThread().getId());
return Observable.just(integer.intValue() * 2);
}
})
.observeOn(Schedulers.newThread())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println("third ->>> " + Thread.currentThread().getId());
return "No." + integer;
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {

@Override
public void call(String s) {
System.out.println("forth ->>> " + Thread.currentThread().getId());

System.out.println("result ->>> " + s);
}
});

我们把字符串转int后乘以2再加一个前缀返回,同时打印了执行操作的相应线程id,输出结果是

[code]first ->>> 1
second ->>> 1
third ->>> 15
forth ->>> 14
result ->>> No.2

可以发现前两个是同一个发布线程,而后面两个我都设置了新的回调处理线程,所以后面两个分别在不同的新线程,是不是很方便,具体线程是怎么切换的我会在下一篇里分析

我们先看一下Observable的创建方法

[code]public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

Observable会创建一个OnSubscribe实例,而这个实例会被存放在当前Observable的成员变量中,注意存放的这个变量是final类型;一般我们并不会去自定义一个这个,用Rx提供的api就足够了

接下来我们用常用方法分析一下

[code] Observable.just(1,2,3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
System.out.println("->>>> " + integer);
}
});

 

 这就是一个简单的案列,会依次发布1-3的数据,我们看一下just都做了啥

[code] public static <T> Observable<T> just(T t1, T t2, T t3) {
return from((T[])new Object[] { t1, t2, t3 });
}

public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}

我们可以看出,just会把数据包装成一个数组再调用from方法,这两个常用方法其实可以看成一个

其中OnSubscribeFromArray实现了OnSubscribe接口

调用just会把要发布的数据组装成功一个数据,然后把这个数据交付给OnSubscribeFromArray,然后把这个实例化的对象作为OnSubscribe从而创建一个新的Observable对象,所以这里只会返回一个Observable对象,那什么时候才会发布数据呢?

然后是subscribe方法的调用

[code] public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//忽略部分代码,正常流程的大致意思
subscriber.onStart();

if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

observable.onSubscribe.call(subscriber);
return subscriber;
}

这里会先执行subscriber的onStart方法,然后调用上面保存的OnSubscribe对象的call方法,把subscriber作为参数传进去,最后把subscribe作为Subscription返回,其中subscribe本身实现了Subscription接口

这里有个一SafeSubscriber,这个是SubScriber的子类,非这个类的都会被强制包装成这个类型,至于为什么

[code]private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;

protected Subscriber() {
this(null, false);
}

protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}

public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;

public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
}

Subscribe中有两个成员终型成员变量,我们默认传空参数是不会给这两个赋值的,而这两个变量对后续发布会有作用

Subscription只提供两个方法,主要是解除注册和相应的判断,后续发布判断都会判断这个依据,如果被解除注册则后续的onNext, onError和onComplete都不会再通知

[code]public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}

那重点就回到OnSubscribe的call方法上

[code]@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass thru unless a request has been made
if (toRequest == NOT_SET) {
// we pass-thru to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}

call会调用上面创建的subscribe的setProducer方法,这个subscribe就是上面SafeSubscriber包装的内部对象

而SafeSubscriber这个subscribe不为空,其父类Subscriber的subscribe是空 的,requested则是默认初始值NOT_SET,所以最后这个会连调两次最终到父类的方法,走到producer.request(Long.MAX_VALUE)

就是上面的new FromArrayProducer(...).request(Long.MAX_VALUE)

再看下这个类

[code] static final class FromArrayProducer<T> extends AtomicLong implements Producer {
final Subscriber<? super T> child;
final T[] array;
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}
...
}

参数一个是当前的subscribe,一个是初始化传入的泛型数组

[code]@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

最终会执行到这里的fastPath

[code]void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

这里会依次调用child也就是我们注册的Subscriber的onNext(),数组值取完后会调用onCompleted();而且onNext和onComplete前都会检测Subscriber的注册状态,如果已经被解除则后续的都不会给回调回来

注:这里的child是被封装后的SafeSubscriber了,具体方法是

[code]@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
...
}
}
}

@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
...
}
}

也就是最终会回到我们最初注册的那个subscriber内

总结:

1. Observable初始化添加数组会创建一个OnSubscriber实例存储数组,不做具体操作

2. subscribe方法需要传入我们自定义的Subscriber实现类我们叫做mySubscriber,就是接收到数据的处理方法;调用后会马上调用mySubscriber的onStart()方法,然后把这个用SafeSubscriber包装后,调用最开始创建的OnSubscriber的call方法,传入这个包后的对象,并把这个对象当做Subscription操作类返回,这个操作类可以控制事件的解注册判断

3.OnSubscriber对象的call方法最终会遍历数组内的泛型数据,并通过把这些数据原封不动的依次分发到我们注册的方法内

  • 点赞
  • 收藏
  • 分享
  • 文章举报
迷途の知归 发布了20 篇原创文章 · 获赞 1 · 访问量 3011 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: