您的位置:首页 > Web前端 > React

Reactive Programming with RxJava-Chapter5:Reactive from Top to Bottom(2)

2017-03-16 20:37 183 查看

Relational Database Access

There are reasons for JDBC to remain blocking.

1. Query parsing (CPU-bound) translates a String containing a query into a parse tree.

2. Query optimizer (CPU-bound) evaluates the query against various rules and statisics,try to build an execution plan

3. Query executor (I/O-bound) traverses database storage and finds appropriate tuples to return.

4. Result set (network-bound) is serialized and pushed back to the client

The advice for interacting with relational databases is to actually have a dedicated,well-tuned thread pool and isolate the blocking code there.

CompletableFuture and Streams

CompletableFuture class significantly improves the Future interface known since Java 5.

import static java.util.function.Function.identity

List<TravelAgency> agencies = //...
CompletableFuture<User> user = findByIdAsync(id);
CompletableFuture<GeoLocation> location = locateAsync();

CompletableFuture<Ticket> ticketFuture = user
.thenCombine(location,(User us,GeoLocation loc)) -> agencies
.stream()
.map(agency -> agency.searchAsync(us,loc))
.reduce((f1,f2) ->
f1.applyToEither(f2,identity())
)
.get()
.thenCompose(identity())
.thenCompose(this::bookAsync);


CompletableFuture shares a lot of similarities with Observable.

CompletableFutureObservable
thenApply()map()
thenCombine()zip()
anyOf(),applyToEither()amb()
thenCompose()flatMap()

Interoperability with CompletableFuture

Semantically,you can treat CompletableFuture like an Observable that has the following characteristics:

It is hot. The computation behind CompletableFuture starts eagerly,regardless of whether someone registered any callbacks like thenApply() or not.

It is cached. The background computation behind CompletableFuture is triggered once eagerly and the result is forwarded to alll registered callbacks.Moreover,iif a callback is registered after completion,it is immediately with completed value (or execption)

It emits exactly one element or exception. In principle,Future completes exactly once (or never) with a value of type T or an exception.This matches the contract of Observable.

Turning CompletableFuture into Observable with Single item

class Util{
static <T> Observable<T> observe(CompletableFuture<T> future){
return Observable.create( subscriber -> {
future.whenComplete((value,exception) -> {
if(exception !=null){
subscriber.onError(exception);
}else{
subscriber.onNext(value);
subscriber.onCompleted();
}
});
});
}
}


Remember that CompletableFuture is hot and cached using Rx terminology.It begins computation immediately,whereas Observable will not start computation until someone actually subscribes.

Observable<User> rxFindById(long id){
return Util.observe(findByIdAsync(id));
}
Observable<GeoLoaction> rxLocate(){
return Util.observe(LocateAsync());
}
Observable<Ticket> rxBook(Flight flight){
return Util.observe(bookAsync(flight));
}

Observable<TravelAgency> agencies = agencies();
Observable<User> user = rxFindById(id);
Observable<GeoLocation> location = rxLocate();

Observable<Ticket> ticket = user
.zipWith(location,(us,loc)) ->
agencies
.flatMap(agency -> agency.rxSearch(us,loc))
.first()
)
.flatMap(x -> x)
.flatMap(this::rxBook);


From Observable to CompletableFuture

static <T> CompletableFuture<T> toFuture(Observable<T> observable) {
CompletableFuture<T> promise = new CompletableFuture<T>();
observable
.single()
.subscribe(
promise::complete,
promise::completeExceptionally
);
return promise;
}

static <T> Completable<<List<T>>> toFutureList(Observable<T> observable) {
return toFuture(observable.toList());
}


Observable versus Single

Creating and Consuming Single

AsyncHttpClient asyncHttpClient = new AsyncHttpClient()

Single<Response> fetch(String address){
return Single.create(subscriber ->
asyncHttpClient
.prepareGet(address)
.execute(handler(subscriber)));
}

AsyncCompletionHandler handler(SingleSubscriber<? super Response> subscriber){
return new AsyncCompletionHandler() {
public Response onComplete(Response response){
subscriber.onSuccess(response);
return response;
}

public void onThrowable(Throwable t){
subscriber.onError(t);
}
}
}


Combining Responses Using zip,merge,and concat

Interoperablility with Observable and CompletableFuture

However there are two situations for which conversion between Observable and Single makes sense:

- When we use Single as an Observable that emits one value and completion notification (or error notitication)

- When Single is missing certain operators available in Observable.

When to Use Single?

You should use Single in the following scenarios:

- An operation must complete with some particular value or an exception.

- There is no such thing as a stream in your problem domain;using Observable would be misleading and an overkill;

- Observable is too heavyweight and you measured tha Single is faster in your paticular problem

You should prefer Observable for these circumstances:

- You model some sort of events (messages,GUI events) which are by definition occurring several times,possibly infinite.

- Or entirely the opposite,you expert the value to occur or not before completion.

最后,安利一款自己写的基于MVP的Android开发框架

https://github.com/sxenon/Pure

欢迎大家拍砖,如果觉得好,麻烦多多star
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Rxjava