您的位置:首页 > 编程语言 > Java开发

Spring WebFlux之HttpHandler的探索

2019-07-16 15:55 1516 查看

这是本人正在写的《Java 编程方法论:响应式Reactor3、Reactor-Netty和Spring WebFlux》一书的文章节选,它是《Java编程方法论:响应式RxJava与代码设计实战》的续篇,也可作为独立的一本来读

这是此节上半段的节选内容

HttpHandler的探索

通过前面的章节,我们已经接触了

Reactor-Netty
整个流程的设计实现细节,同时也涉及到了
reactor.netty.http.server.HttpServer#handle
,准确得说,它是一个
SPI(Service Provider Interface)
接口,对外提供
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler
,这样,我们可以针对该
handler
依据自身环境进行相应实现。
Spring WebFlux
Reactor-Netty
都有一套属于自己的实现,只不过前者为了适应
Spring Web
的一些习惯做了大量的适配设计,整个过程比较复杂,后者提供了一套简单而灵活的实现。那么本章我们就从
Reactor-Netty
内对它的实现开始,正式向
Spring WebFlux
进行过渡。

HttpServerRoutes设定

往往我们在给后台服务器提交

HTTP
请求的时候,往往会涉及到
get
head
post
put
这几种类型,还会包括请求地址,服务端会根据请求类型和请求地址提供对应的服务,然后才是具体的处理,那么我们是不是可以将寻找服务的这个过程抽取出来,形成服务路由查找。

于是,在

Reactor-Netty
中,设计了一个
HttpServerRoutes
接口,该接口继承了
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
,用来路由请求,当请求来临时,对我们所设计的路由规则按顺序依次查找,直到第一个匹配,然后调用对应的处理
handler
HttpServerRoutes
接口内针对于我们常用的
get
head
post
put
delete
等请求设计了对应的路由规则(具体请看下面源码)。

我们在使用的时候首先会调用

HttpServerRoutes#newRoutes
得到一个
DefaultHttpServerRoutes
实例,然后加入我们设计的路由规则,关于路由规则的设计,其实就是将一条条规则通过一个集合管理起来,然后在需要时进行遍历匹配即可,这里它的核心组织方法就是
reactor.netty.http.server.HttpServerRoutes#route
,在规则设计完后,我们就可以设计对应每一条规则的
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
函数式实现,最后,当请求路由匹配成功,就可以调用我们的
BiFunction
实现,对请求进行处理。

//reactor.netty.http.server.HttpServerRoutes
public interface HttpServerRoutes extends
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {

static HttpServerRoutes newRoutes() {
return new DefaultHttpServerRoutes();
}

default HttpServerRoutes delete(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.delete(path), handler);
}

...

default HttpServerRoutes get(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.get(path), handler);
}

default HttpServerRoutes head(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.head(path), handler);
}

default HttpServerRoutes index(final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(INDEX_PREDICATE, handler);
}

default HttpServerRoutes options(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.options(path), handler);
}

default HttpServerRoutes post(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.post(path), handler);
}

default HttpServerRoutes put(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.put(path), handler);
}

HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler);

...

}

关于路由规则的设计,结合前面所讲,我们可以在

HttpServerRoutes
的实现类中设计一个
List
用来存储一条条的规则,接下来要做的就是将制定的规则一条条放入其中即可,因为这是一个添加过程,并不需要返回值,我们可以使用
Consumer<? super HttpServerRoutes>
来代表这个过程。对于请求的匹配,往往都是对请求的条件判断,那我们可以使用
Predicate<? super HttpServerRequest>
来代表这个判断逻辑,由于单条路由规则匹配对应的
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
处理,那么我们是不是可以将这两者耦合到一起,于是
reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
就设计出来了:

//reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
static final class HttpRouteHandler
implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,
Predicate<HttpServerRequest> {

final Predicate<? super HttpServerRequest>          condition;
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
handler;
final Function<? super String, Map<String, String>> resolver;

HttpRouteHandler(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
@Nullable Function<? super String, Map<String, String>> resolver) {
this.condition = Objects.requireNonNull(condition, "condition");
this.handler = Objects.requireNonNull(handler, "handler");
this.resolver = resolver;
}

@Override
public Publisher<Void> apply(HttpServerRequest request,
HttpServerResponse response) {
return handler.apply(request.paramsResolver(resolver), response);
}

@Override
public boolean test(HttpServerRequest o) {
return condition.test(o);
}
}

这里可能需要对

request
中的参数进行解析,所以对外提供了一个可供我们自定义的参数解析器实现接口:
Function<? super String, Map<String, String>>
,剩下的
condition
resolver
就可以按照我们前面说的逻辑进行。

此时,

HttpRouteHandler
属于一个真正的请求校验者和请求业务处理者,我们现在要将它们的功能通过一系列逻辑串联形成一个处理流程,那么这里可以通过一个代理模式进行,我们在
HttpServerRoutes
的实现类中通过一个
List
集合管理了数量不等的
HttpRouteHandler
实例,对外,我们在使用
reactor.netty.http.server.HttpServer#handle
时只会看到一个
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
实现,那么,所有的逻辑流程处理都应该在这个
BiFunction
apply(...)
实现中进行,于是,我们就有下面的
reactor.netty.http.server.DefaultHttpServerRoutes
实现:

//reactor.netty.http.server.DefaultHttpServerRoutes
final class DefaultHttpServerRoutes implements HttpServerRoutes {

private final CopyOnWriteArrayList<HttpRouteHandler> handlers =
new CopyOnWriteArrayList<>();
...
@Override
public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(condition, "condition");
Objects.requireNonNull(handler, "handler");

if (condition instanceof HttpPredicate) {
handlers.add(new HttpRouteHandler(condition,
handler,
(HttpPredicate) condition));
}
else {
handlers.add(new HttpRouteHandler(condition, handler, null));
}
return this;
}

@Override
public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
final Iterator<HttpRouteHandler> iterator = handlers.iterator();
HttpRouteHandler cursor;

try {
while (iterator.hasNext()) {
cursor = iterator.next();
if (cursor.test(request)) {
return cursor.apply(request, response);
}
}
}
catch (Throwable t) {
Exceptions.throwIfJvmFatal(t);
return Mono.error(t); //500
}

return response.sendNotFound();
}
...
}

可以看到

route(...)
方法只是做了
HttpRouteHandler
实例的构建并交由
handlers
这个
list
进行管理,通过上面的
apply
实现将前面的内容在流程逻辑中进行组合。于是,我们就可以在
reactor.netty.http.server.HttpServer
中设计一个
route
方法,对外提供一个
SPI
接口,将我们所提到的整个过程定义在这个方法中(得到一个
HttpServerRoutes
实例,然后通过它的
route
方法构建规则,构建过程在前面提到的
Consumer<? super HttpServerRoutes>
中进行,最后将组合成功的
HttpServerRoutes
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
的角色作为参数交由
HttpServer#handle
)。

另外,我们在这里要特别注意下,在上面

DefaultHttpServerRoutes
实现的
apply
方法中,可以看出,一旦请求匹配,处理完后就直接返回结果,不再继续遍历匹配,也就是说每次新来的请求,只调用所声明匹配规则顺序的第一个匹配。

//reactor.netty.http.server.HttpServer#route
public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
Objects.requireNonNull(routesBuilder, "routeBuilder");
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
routesBuilder.accept(routes);
return handle(routes);
}

于是,我们就可以通过下面的

Demo
来应用上面的设计:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes ->
routes.get("/hello",        <1>
(request, response) -> response.sendString(Mono.just("Hello World!")))
.post("/echo",        <2>
(request, response) -> response.send(request.receive().retain()))
.get("/path/{param}", <3>
(request, response) -> response.sendString(Mono.just(request.param("param")))))
.bindNow();

server.onDispose()
.block();
}
}

<1>
处,当我们发出一个
GET
请求去访问
/hello
时就会得到一个字符串
Hello World!

<2>
处,当我们发出一个
POST
请求去访问
/echo
时就会将请求体作为响应内容返回。

<3>
处,当我们发出一个
GET
请求去访问
/path/{param}

时就会得到一个请求路径参数
param
的值。

关于

SSE
在这里的使用,我们可以看下面这个Demo,具体的代码细节就不详述了,看对应注释即可:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;

public class Application {

public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes -> routes.get("/sse", serveSse()))
.bindNow();

server.onDispose()
.block();
}

/**
* 准备 SSE response
* 参考 reactor.netty.http.server.HttpServerResponse#sse可以知道它的"Content-Type"
* 是"text/event-stream"
* flush策略为通过所提供的Publisher来每下发一个元素就flush一次
*/
private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
return (request, response) ->
response.sse()
.send(flux.map(Application::toByteBuf), b -> true);
}

/**
* 将发元素按照按照给定的格式由Object转换为ByteBuf。
*/
private static ByteBuf toByteBuf(Object any) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
out.write("data: ".getBytes(Charset.defaultCharset()));
MAPPER.writeValue(out, any);
out.write("\n\n".getBytes(Charset.defaultCharset()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
return ByteBufAllocator.DEFAULT
.buffer()
.writeBytes(out.toByteArray());
}

private static final ObjectMapper MAPPER = new ObjectMapper();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐