您的位置:首页 > 编程语言 > Java开发

RxJava实例(三)

2016-04-02 17:33 459 查看
问题:

如何在http请求出错的时候重试?进一步,如何动态添加重试次数和间隔时间?

如何实现定时轮询?

方式有很多,用RxJava如何优雅地实现?不知道,请看这篇文章。

(From:http://www.jianshu.com/p/023a5f60e6d0)

retry/retryWhen/repeat/repeatWhen

当.repeat()接收到.onCompleted()事件后触发重订阅。

当.retry()接收到.onError()事件后触发重订阅

retryWhen,字面理解就是出现错误重试,出错的时候把错误发给你,由你决定是否重试

repeatWhen,完成的时候触发是否重试

区别:

.repeatWhen()与.retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable。

OK,看例子。

例子1:

执行请求的时候出错,如果是网络错误则重试,否则结束

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {

// For IOExceptions, we  retry
if (error instanceof IOException) {
return Observable.just(null);
}

// For anything else, don't retry
return Observable.error(error);
}
});
}
})


解释:

只有在Throwable是IOException的情况下请求重试,否则结束。

source每次一调用onError(Throwable),Observable都会被作为输入传入到Func1中的call方法中。Func1的签名:

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)


Func1像个工厂类,用来实现你自己的重试逻辑。输入的是一个Observable。输出的是一个Observable

例子2:

使用.repeatWhen() + .delay()定期轮询数据:

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Void> completed) {

return completed.delay(5, TimeUnit.SECONDS);
}
})、


解释:

直到notificationHandler(Func1)发送onNext()才会重订阅到source,这里在发送onNext()之前delay了5秒,

使用.flatMap() + .timer()/interval()实现延迟重订阅,也就是出错延迟重试:

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {

return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
})


例子3:

使用.flatMap() + .timer()实现有限次数的重订阅

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {

return i;
}
});
}
})


解释:

当第四次error出现的时候,range(1,3)中的数字已经耗尽了,所以它隐式调用了onCompleted(),从而导致整个zip的结束。防止了进一步的重试。

例子4:

将可变延迟策略与次数限制的重试机制结合起来

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {

return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override public Observable<? extends Long> call(Integer retryCount) {

return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
}
});
}
})


解释:

通过重试次数来修改延迟时间。重试三次,并且每一次的重试时间都是5 ^ retryCount,仅仅通过一些操作符的组合就帮助我们实现了指数退避算法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: