Spring WebFlux之HttpHandler的探索
2019-07-16 15:55
1516 查看
这是本人正在写的《Java 编程方法论:响应式Reactor3、Reactor-Netty和Spring WebFlux》一书的文章节选,它是《Java编程方法论:响应式RxJava与代码设计实战》的续篇,也可作为独立的一本来读
这是此节上半段的节选内容
HttpHandler的探索
通过前面的章节,我们已经接触了
Reactor-Netty整个流程的设计实现细节,同时也涉及到了
reactor.netty.http.server.HttpServer#handle,准确得说,它是一个
SPI(Service Provider Interface)接口,对外提供
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,这样,我们可以针对该
handler依据自身环境进行相应实现。
Spring WebFlux与
Reactor-Netty都有一套属于自己的实现,只不过前者为了适应
Spring Web的一些习惯做了大量的适配设计,整个过程比较复杂,后者提供了一套简单而灵活的实现。那么本章我们就从
Reactor-Netty内对它的实现开始,正式向
Spring WebFlux进行过渡。
HttpServerRoutes设定
往往我们在给后台服务器提交
HTTP请求的时候,往往会涉及到
get,
head,
post,
put这几种类型,还会包括请求地址,服务端会根据请求类型和请求地址提供对应的服务,然后才是具体的处理,那么我们是不是可以将寻找服务的这个过程抽取出来,形成服务路由查找。
于是,在
Reactor-Netty中,设计了一个
HttpServerRoutes接口,该接口继承了
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,用来路由请求,当请求来临时,对我们所设计的路由规则按顺序依次查找,直到第一个匹配,然后调用对应的处理
handler。
HttpServerRoutes接口内针对于我们常用的
get,
head,
post,
put,
delete等请求设计了对应的路由规则(具体请看下面源码)。
我们在使用的时候首先会调用
HttpServerRoutes#newRoutes得到一个
DefaultHttpServerRoutes实例,然后加入我们设计的路由规则,关于路由规则的设计,其实就是将一条条规则通过一个集合管理起来,然后在需要时进行遍历匹配即可,这里它的核心组织方法就是
reactor.netty.http.server.HttpServerRoutes#route,在规则设计完后,我们就可以设计对应每一条规则的
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>函数式实现,最后,当请求路由匹配成功,就可以调用我们的
BiFunction实现,对请求进行处理。
//reactor.netty.http.server.HttpServerRoutes public interface HttpServerRoutes extends BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> { static HttpServerRoutes newRoutes() { return new DefaultHttpServerRoutes(); } default HttpServerRoutes delete(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.delete(path), handler); } ... default HttpServerRoutes get(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.get(path), handler); } default HttpServerRoutes head(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.head(path), handler); } default HttpServerRoutes index(final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(INDEX_PREDICATE, handler); } default HttpServerRoutes options(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.options(path), handler); } default HttpServerRoutes post(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.post(path), handler); } default HttpServerRoutes put(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return route(HttpPredicate.put(path), handler); } HttpServerRoutes route(Predicate<? super HttpServerRequest> condition, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler); ... }
关于路由规则的设计,结合前面所讲,我们可以在
HttpServerRoutes的实现类中设计一个
List用来存储一条条的规则,接下来要做的就是将制定的规则一条条放入其中即可,因为这是一个添加过程,并不需要返回值,我们可以使用
Consumer<? super HttpServerRoutes>来代表这个过程。对于请求的匹配,往往都是对请求的条件判断,那我们可以使用
Predicate<? super HttpServerRequest>来代表这个判断逻辑,由于单条路由规则匹配对应的
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>处理,那么我们是不是可以将这两者耦合到一起,于是
reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler就设计出来了:
//reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler static final class HttpRouteHandler implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>, Predicate<HttpServerRequest> { final Predicate<? super HttpServerRequest> condition; final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler; final Function<? super String, Map<String, String>> resolver; HttpRouteHandler(Predicate<? super HttpServerRequest> condition, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler, @Nullable Function<? super String, Map<String, String>> resolver) { this.condition = Objects.requireNonNull(condition, "condition"); this.handler = Objects.requireNonNull(handler, "handler"); this.resolver = resolver; } @Override public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) { return handler.apply(request.paramsResolver(resolver), response); } @Override public boolean test(HttpServerRequest o) { return condition.test(o); } }
这里可能需要对
request中的参数进行解析,所以对外提供了一个可供我们自定义的参数解析器实现接口:
Function<? super String, Map<String, String>>,剩下的
condition与
resolver就可以按照我们前面说的逻辑进行。
此时,
HttpRouteHandler属于一个真正的请求校验者和请求业务处理者,我们现在要将它们的功能通过一系列逻辑串联形成一个处理流程,那么这里可以通过一个代理模式进行,我们在
HttpServerRoutes的实现类中通过一个
List集合管理了数量不等的
HttpRouteHandler实例,对外,我们在使用
reactor.netty.http.server.HttpServer#handle时只会看到一个
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>实现,那么,所有的逻辑流程处理都应该在这个
BiFunction的
apply(...)实现中进行,于是,我们就有下面的
reactor.netty.http.server.DefaultHttpServerRoutes实现:
//reactor.netty.http.server.DefaultHttpServerRoutes final class DefaultHttpServerRoutes implements HttpServerRoutes { private final CopyOnWriteArrayList<HttpRouteHandler> handlers = new CopyOnWriteArrayList<>(); ... @Override public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { Objects.requireNonNull(condition, "condition"); Objects.requireNonNull(handler, "handler"); if (condition instanceof HttpPredicate) { handlers.add(new HttpRouteHandler(condition, handler, (HttpPredicate) condition)); } else { handlers.add(new HttpRouteHandler(condition, handler, null)); } return this; } @Override public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) { final Iterator<HttpRouteHandler> iterator = handlers.iterator(); HttpRouteHandler cursor; try { while (iterator.hasNext()) { cursor = iterator.next(); if (cursor.test(request)) { return cursor.apply(request, response); } } } catch (Throwable t) { Exceptions.throwIfJvmFatal(t); return Mono.error(t); //500 } return response.sendNotFound(); } ... }
可以看到
route(...)方法只是做了
HttpRouteHandler实例的构建并交由
handlers这个
list进行管理,通过上面的
apply实现将前面的内容在流程逻辑中进行组合。于是,我们就可以在
reactor.netty.http.server.HttpServer中设计一个
route方法,对外提供一个
SPI接口,将我们所提到的整个过程定义在这个方法中(得到一个
HttpServerRoutes实例,然后通过它的
route方法构建规则,构建过程在前面提到的
Consumer<? super HttpServerRoutes>中进行,最后将组合成功的
HttpServerRoutes以
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>的角色作为参数交由
HttpServer#handle)。
另外,我们在这里要特别注意下,在上面
DefaultHttpServerRoutes实现的
apply方法中,可以看出,一旦请求匹配,处理完后就直接返回结果,不再继续遍历匹配,也就是说每次新来的请求,只调用所声明匹配规则顺序的第一个匹配。
//reactor.netty.http.server.HttpServer#route public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) { Objects.requireNonNull(routesBuilder, "routeBuilder"); HttpServerRoutes routes = HttpServerRoutes.newRoutes(); routesBuilder.accept(routes); return handle(routes); }
于是,我们就可以通过下面的
Demo来应用上面的设计:
import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; public class Application { public static void main(String[] args) { DisposableServer server = HttpServer.create() .route(routes -> routes.get("/hello", <1> (request, response) -> response.sendString(Mono.just("Hello World!"))) .post("/echo", <2> (request, response) -> response.send(request.receive().retain())) .get("/path/{param}", <3> (request, response) -> response.sendString(Mono.just(request.param("param"))))) .bindNow(); server.onDispose() .block(); } }
在
<1>处,当我们发出一个
GET请求去访问
/hello时就会得到一个字符串
Hello World!。
在
<2>处,当我们发出一个
POST请求去访问
/echo时就会将请求体作为响应内容返回。
在
<3>处,当我们发出一个
GET请求去访问
/path/{param}
时就会得到一个请求路径参数
param的值。
关于
SSE在这里的使用,我们可以看下面这个Demo,具体的代码细节就不详述了,看对应注释即可:
import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; import java.io.ByteArrayOutputStream; import java.nio.charset.Charset; import java.time.Duration; import java.util.function.BiFunction; public class Application { public static void main(String[] args) { DisposableServer server = HttpServer.create() .route(routes -> routes.get("/sse", serveSse())) .bindNow(); server.onDispose() .block(); } /** * 准备 SSE response * 参考 reactor.netty.http.server.HttpServerResponse#sse可以知道它的"Content-Type" * 是"text/event-stream" * flush策略为通过所提供的Publisher来每下发一个元素就flush一次 */ private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() { Flux<Long> flux = Flux.interval(Duration.ofSeconds(10)); return (request, response) -> response.sse() .send(flux.map(Application::toByteBuf), b -> true); } /** * 将发元素按照按照给定的格式由Object转换为ByteBuf。 */ private static ByteBuf toByteBuf(Object any) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { out.write("data: ".getBytes(Charset.defaultCharset())); MAPPER.writeValue(out, any); out.write("\n\n".getBytes(Charset.defaultCharset())); } catch (Exception e) { throw new RuntimeException(e); } return ByteBufAllocator.DEFAULT .buffer() .writeBytes(out.toByteArray()); } private static final ObjectMapper MAPPER = new ObjectMapper(); }
相关文章推荐
- org.springframework.web.servlet.DispatcherServlet noHandlerFound 警告: No mapping found for HTTP reque
- org.springframework.web.se rvlet.DispatcherServlet.noHandlerFound No mapping found for HTTP request
- org.springframework.web.servlet.PageNotFound noHandlerFound 警告: No mapping found for HTTP request w
- 异常:The absolute uri: http://www.springframework.org/security/tags cannot be resolved in either web.xml or the jar files deployed with this application
- Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/sc
- 实战 HTTP 处理程序(HTTP Handler) (4)--与Web程序共享Session
- 用kindeditor 报:org.apache.catalina.connector.RequestFacade cannot be cast to org.springframework.web.multipart.MultipartHttpServletRequest
- HttpClient调用Web工程的Spring @Controller方法
- 使用ControllerAdvice注意事项,Ambiguous @ExceptionHandler method mapped for [class org.springframework.web.bind.MethodArgumentNotValidException]
- Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/sc
- org.springframework.web.servlet.DispatcherServlet noHandlerFound
- (5)Spring WebFlux快速上手——响应式Spring的道法术器
- Unable to locate Spring NamespaceHandler for XML schema namespace [http://ww
- org.springframework.web.HttpMediaTypeNotAcceptableException: Could not find acceptable representatio
- 使用 Spring 3 MVC HttpMessageConverter 功能构建 RESTful web 服务
- 尝试Spring Boot2 WebFlux(启动失败了?不要怕)
- spring webflux文件上传下载
- springMVC:org.springframework.web.servlet.PageNotFound.handleHttpRequestMethodNotSupported Request method 'POST' not supported
- Spring Boot WebFlux 上手教程
- 使用 Spring 3 MVC HttpMessageConverter 功能构建 RESTful web 服务