从零开始实现简单 RPC 框架 8:网络通信之 Request-Response 模型
2021-09-06 08:00
1021 查看
Netty 在服务端与客户端的网络通信中,使用的是异步双向通信(双工)的方式,即客户端和服务端可以相互主动发请求给对方,发消息后不会同步等响应。这样就会有一下问题:
- 如何识别消息是请求还是响应?
- 请求如何正确对应到响应?
1. 如何识别消息是请求还是响应
为了识别消息类型是请求或者响应,我们在消息中加入了
messageType的属性,在上文我们也提到,这个消息类型在自定义协议的头部,他有几种类型:请求、响应、心跳,我们先来说说请求、响应。
public enum MessageType { /** * 普通请求 */ REQUEST((byte) 1), /** * 普通响应 */ RESPONSE((byte) 2), /** * 心跳 */ HEARTBEAT((byte) 3), ; private final byte value; }
请求(Request)的核心字段如下:
public class RpcRequest { /** * 接口名 */ private String interfaceName; /** * 方法名 */ private String methodName; /** * 参数列表 */ private Object[] params; /** * 参数类型列表 */ private Class<?>[] paramTypes; /** * 接口版本 */ private String version; }
响应(Response)的核心字段如下:
public class RpcResponse<T> { /** * 请求id */ private long requestId; /** * 响应码 */ private Integer code; /** * 提示消息 */ private String message; /** * 响应数据 */ private T data; }
发送消息的时候,按照消息类型和结构体,将数据组装好,写到 channel 即可。接收消息则要先解码,从消息头拿到消息类型,根据消息类型来反序列化到对应的结构体。
2. 请求如何正确对应到响应
流程图如下: 有几个关键点:
- 客户端请求之后拿到 Future
- 有一个
Map
存放未响应的请求,Key: RequestId,Value: Future
- 服务端响应的数据中,包含了客户端的 RequestId,这是对应的关键
- 响应的结果会被
NettyClientHandler.channelRead0
监听到,根据响应的 RequestId 取出对应的 Future - 将结果写到对应的 Future 中
- 客户端通过
future.get()
获取到数据
1) 客户端发请求
代码如下:
public class NettyInvoker extends AbstractInvoker { private final NettyClient nettyClient = NettyClient.getInstance(); @Override protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException { // 获取 Channel Channel channel = nettyClient.getChannel(socketAddress); // 构造一个空 Future CompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>(); // 构建 RPC 消息,此处会构建 requestId RpcMessage rpcMessage = buildRpcMessage(request); // 将 request 和 Future 对应放到 Map 中 UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture); // 发出请求 channel.writeAndFlush(rpcMessage); // 返回结果 return new AsyncResult(resultFuture); } // ... }
返回的
AsyncResult只是
future的包装。
public class AsyncResult implements RpcResult { private final CompletableFuture<?> future; public AsyncResult(CompletableFuture<?> future) { this.future = future; } }
2) 请求暂存
这个存储未响应的请求在
ccx-rpc中是
UnprocessedRequests类在管理:
public class UnprocessedRequests { private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>(); public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) { FUTURE_MAP.put(requestId, future); } }
3) 服务端响应数据监听
使用 Netty 的 Handler 监听服务端响应的数据,当有数据响应,则调用
UnprocessedRequests.complete写入。
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> { @Override protected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) { RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData(); UnprocessedRequests.complete(response); } }
UnprocessedRequests.complete实际上就是找出并删除对应的请求,然后将数据写入:
future.complete(rpcResponse)
public class UnprocessedRequests { private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>(); /** * 完成响应 * * @param rpcResponse 响应内容 */ public static void complete(RpcResponse<?> rpcResponse) { CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId()); if (future != null) { future.complete(rpcResponse); } else { throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse)); } } }
最后通过
AsyncResult.getData可以获取到数据。
public class AsyncResult implements RpcResult { private final CompletableFuture<?> future; public AsyncResult(CompletableFuture<?> future) { this.future = future; } @Override public Object getData() { try { return future.get(); } catch (InterruptedException | ExecutionException e) { log.error("getData error.", e); } return null; } }
总结
Netty 网络通信是异步双工的,我们需要用正确 Request-Response 模型让客户端和服务端正确交互。
- 如何区分请求或响应? 在消息中,可以加入 messageType 字段用来区分是请求或者响应。
- 如何把请求和响应对应? 发出的请求需要用 RequestId 标记并用 Map 存起来。服务端收到请求之后,将 RequestId 原封不动写到响应结果中。客户端收到响应结果后,拿出 RequestId 找到对应的 Future 并写入结果。
ccx-rpc 代码已经开源 Github:https://github.com/chenchuxin/ccx-rpc Gitee:https://gitee.com/imccx/ccx-rpc
相关文章推荐
- 从零开始实现简单 RPC 框架 9:网络通信之心跳与重连机制
- 用AKKA实现简单的RPC通信模型2
- IOS网络通信 之第三方框架ASIHttpRequest的简单使用
- 从零开始实现简单 RPC 框架 1:RPC 框架的结构和设计
- Android-Volley网络通信框架(自定义Request 请求:实现 GsonRequest)
- 基于Netty的RPC简单框架实现(四):Netty实现网络传输
- 【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通信框架
- 从零开始实现简单 RPC 框架 2:扩展利器 SPI
- 优雅设计封装基于Okhttp3的网络框架(六):HttpHeader接口设计实现 及 Response、Request封装实现
- 从零开始实现简单 RPC 框架 3:配置总线 URL
- go实现一个简单的游戏服务器框架(lotou)网络通信
- 一个非常简单的.net网络通信框架XNetFramework(符源码与测试Demo)
- C#网络编程简单实现通信小例子-2
- 网络游戏客户端通信模块简单实现
- 用libev的c语言版本实现简单的网络通信服务器
- 基于Netty的RPC简单框架实现(三):Kryo实现序列化
- 用Java socket (TCP通信模型)实现一个简单的web 服务器
- 基于Netty网络通信框架的电子白板,可实现同屏互动功能
- HttpRequest网络编程的实现框架
- Linux套接字实现简单的客户/服务器网络通信示例