Reactive Programming with RxJava-Chapter5:Reactive from Top to Bottom(2)
2017-03-16 20:37
183 查看
Relational Database Access
There are reasons for JDBC to remain blocking.1. Query parsing (CPU-bound) translates a String containing a query into a parse tree.
2. Query optimizer (CPU-bound) evaluates the query against various rules and statisics,try to build an execution plan
3. Query executor (I/O-bound) traverses database storage and finds appropriate tuples to return.
4. Result set (network-bound) is serialized and pushed back to the client
The advice for interacting with relational databases is to actually have a dedicated,well-tuned thread pool and isolate the blocking code there.
CompletableFuture and Streams
CompletableFuture class significantly improves the Future interface known since Java 5.import static java.util.function.Function.identity List<TravelAgency> agencies = //... CompletableFuture<User> user = findByIdAsync(id); CompletableFuture<GeoLocation> location = locateAsync(); CompletableFuture<Ticket> ticketFuture = user .thenCombine(location,(User us,GeoLocation loc)) -> agencies .stream() .map(agency -> agency.searchAsync(us,loc)) .reduce((f1,f2) -> f1.applyToEither(f2,identity()) ) .get() .thenCompose(identity()) .thenCompose(this::bookAsync);
CompletableFuture shares a lot of similarities with Observable.
CompletableFuture | Observable |
---|---|
thenApply() | map() |
thenCombine() | zip() |
anyOf(),applyToEither() | amb() |
thenCompose() | flatMap() |
Interoperability with CompletableFuture
Semantically,you can treat CompletableFuture like an Observable that has the following characteristics:It is hot. The computation behind CompletableFuture starts eagerly,regardless of whether someone registered any callbacks like thenApply() or not.
It is cached. The background computation behind CompletableFuture is triggered once eagerly and the result is forwarded to alll registered callbacks.Moreover,iif a callback is registered after completion,it is immediately with completed value (or execption)
It emits exactly one element or exception. In principle,Future completes exactly once (or never) with a value of type T or an exception.This matches the contract of Observable.
Turning CompletableFuture into Observable with Single item
class Util{ static <T> Observable<T> observe(CompletableFuture<T> future){ return Observable.create( subscriber -> { future.whenComplete((value,exception) -> { if(exception !=null){ subscriber.onError(exception); }else{ subscriber.onNext(value); subscriber.onCompleted(); } }); }); } }
Remember that CompletableFuture is hot and cached using Rx terminology.It begins computation immediately,whereas Observable will not start computation until someone actually subscribes.
Observable<User> rxFindById(long id){ return Util.observe(findByIdAsync(id)); } Observable<GeoLoaction> rxLocate(){ return Util.observe(LocateAsync()); } Observable<Ticket> rxBook(Flight flight){ return Util.observe(bookAsync(flight)); } Observable<TravelAgency> agencies = agencies(); Observable<User> user = rxFindById(id); Observable<GeoLocation> location = rxLocate(); Observable<Ticket> ticket = user .zipWith(location,(us,loc)) -> agencies .flatMap(agency -> agency.rxSearch(us,loc)) .first() ) .flatMap(x -> x) .flatMap(this::rxBook);
From Observable to CompletableFuture
static <T> CompletableFuture<T> toFuture(Observable<T> observable) { CompletableFuture<T> promise = new CompletableFuture<T>(); observable .single() .subscribe( promise::complete, promise::completeExceptionally ); return promise; } static <T> Completable<<List<T>>> toFutureList(Observable<T> observable) { return toFuture(observable.toList()); }
Observable versus Single
Creating and Consuming Single
AsyncHttpClient asyncHttpClient = new AsyncHttpClient() Single<Response> fetch(String address){ return Single.create(subscriber -> asyncHttpClient .prepareGet(address) .execute(handler(subscriber))); } AsyncCompletionHandler handler(SingleSubscriber<? super Response> subscriber){ return new AsyncCompletionHandler() { public Response onComplete(Response response){ subscriber.onSuccess(response); return response; } public void onThrowable(Throwable t){ subscriber.onError(t); } } }
Combining Responses Using zip,merge,and concat
Interoperablility with Observable and CompletableFuture
However there are two situations for which conversion between Observable and Single makes sense:- When we use Single as an Observable that emits one value and completion notification (or error notitication)
- When Single is missing certain operators available in Observable.
When to Use Single?
You should use Single in the following scenarios:- An operation must complete with some particular value or an exception.
- There is no such thing as a stream in your problem domain;using Observable would be misleading and an overkill;
- Observable is too heavyweight and you measured tha Single is faster in your paticular problem
You should prefer Observable for these circumstances:
- You model some sort of events (messages,GUI events) which are by definition occurring several times,possibly infinite.
- Or entirely the opposite,you expert the value to occur or not before completion.
最后,安利一款自己写的基于MVP的Android开发框架
https://github.com/sxenon/Pure
欢迎大家拍砖,如果觉得好,麻烦多多star
相关文章推荐
- Reactive Programming with RxJava-Chapter5:Reactive from Top to Bottom(1)
- Reactive Programming with RxJava-Chapter4:Applying Reactive Programming to Existing Application(2)
- Reactive Programming with RxJava-Chapter4:Applying Reactive Programming to Existing Application(1)
- Moving From Top To Bottom in Detailed Block in Oracle Forms
- FromBottomToTop第十二周项目博客
- Reactive Programming with RxJava-Chapter3:Operators and Transformations(2)
- Reactive Programming with RxJava-Chapter3:Operators and Transformations(1)
- Reactive Programming with RxJava-Chapter7:Test and Troubleshooting(1)
- android音量设置from top to bottom
- The Portable Executable File Format from Top to Bottom
- The Portable Executable File Format from Top to Bottom
- PrintFromTopToBottom
- FromBottomToTop第十一周项目博客
- Reactive Programming with Rxjava-Chapter6:Flow Control and Backpressure(1)
- android音量设置from top to bottom
- Beginning Object-Oriented Programming with VB 2005: From Novice to Professional
- Reactive Programming with RxJava-Chapter(9):Future Directions
- Reactive Programming with RxJava-Chapter6:Flow Control and Backpressure(2)