您的位置:首页 > 编程语言 > Java开发

RxJava里doOnNext的使用和线程处理

2016-03-01 15:08 477 查看

doOnNext的使用

我对doOnNext的使用是存在疑惑的,按照官方文档

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

更多的文档里说明do系列的作用是side effect,当onNext发生时,它被调用,不改变数据流。我做的测试代码:

2017.9.28增加修正:

首先要理解什么是副作用:方法执行的时候,产生了外部可以观察到的变化就是产生了副作用。

那些文档的意思是说,不要在doOnNext做能发生副作用的方法,也就是不要去改变数据流

Observable.create(new Observable.OnSubscribe<Person>() {
@Override
public void call(Subscriber<? super Person> subscriber) {
Person person = new Person(201);
subscriber.onNext(person);
}
}).doOnNext(new Action1<Person>() {
@Override
public void call(Person person) {
person.age = 301;
}
}).subscribe(new Action1<Person>() {
@Override
public void call(Person person) {
Log.d(TAG, "call: " + person.age);//输出301
}
});


可见,doOnNext是改变了流里的数据的,所以并不明白不改变数据流是什么意思。

从github上很多项目这篇文章来看,do系列的作用

使用doOnNext()来调试

在flatMap()里使用doOnError()作为错误处理。

使用doOnNext()去保存/缓存网络结果

按照我的测试

final SimpleDateFormat sDateFormat    =   new   SimpleDateFormat("yyyy-MM-dd    hh:mm:ss");
Observable.create(new Observable.OnSubscribe<Person>() {
@Override
public void call(Subscriber<? super Person> subscriber) {
String    date    =    sDateFormat.format(new    Date());
System.out.println(date + " call " + Thread.currentThread().getName());
Person person = new Person(201);
subscriber.onNext(person);
}
}).subscribeOn(Schedulers.io()) //指定耗时进程
.observeOn(Schedulers.newThread()) //指定doOnNext执行线程是新线程
.doOnNext(new Action1<Person>() {
@Override
public void call(Person person) {
String    date    =    sDateFormat.format(new    Date());
System.out.println(date + " call " + Thread.currentThread().getName());
person.age = 301;
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}).observeOn(AndroidSchedulers.mainThread())//指定最后观察者在主线程
.subscribe(new Action1<Person>() {
@Override
public void call(Person person) {
String    date    =    sDateFormat.format(new    Date());
System.out.println(date + " call " + Thread.currentThread().getName());
Log.d(TAG, "call: " + person.age);
}
});


执行结果

03-01 14:49:29.897 23442-24145/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxCachedThreadScheduler-2
03-01 14:49:29.907 23442-24144/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxNewThreadScheduler-2
03-01 14:49:31.907 23442-23442/com.example.myrxlearn I/System.out: 2016-03-01    02:49:31 call main


也就是说直到doOnNext里的方法在新线程执行完毕,subscribe里的call才有机会在主线程执行。

一直没看到有合适的方法解决这个问题,因为缓存的时间不应该去阻碍主线程里数据的显示。

今天做回顾时看到了这篇文章

非阻塞I/O操作

现在我们可以使用Schedulers.io()创建非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap,

String filename) {

Schedulers.io().createWorker().schedule(() -> {

blockingStoreBitmap(context, bitmap, filename);

});  }


然后想起来一直存在的这个问题。只需要把上面的代码改成

.doOnNext(new Action1<Person>() {
@Override
public void call(Person person) {
String    date    =    sDateFormat.format(new    Date());
System.out.println(date + " call " + Thread.currentThread().getName());
person.age = 301;
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
})


不需要在用observeOn指定在新线程就可以实现

03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.347 30368-30368/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call main


这样doOnNext可以方便的用来调试,用来缓存。

奈何,我还是没能明白副作用,期待指点。

doOnNext is for side-effects: you want to react (eg. log) to item

emissions in an intermediate step of your stream, for example before

the stream is filtered, for transverse behavior like logging, but you

still want the value to propagate down the stream.

onNext is more final, it consumes the value.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: