rpc系列-rpc02
2019-06-10 18:03
1796 查看
版权声明:本文为博主(李孟lm)原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_19968255/article/details/82896520
承接rpc系列-rpc01:https://blog.csdn.net/qq_19968255/article/details/82894381
示例
1.结构
2.代码
客户端:
rpc-client
[code]/** * 框架的RPC 客户端(用于发送 RPC 请求) */ public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { private static final Logger logger = LoggerFactory .getLogger(RpcClient.class); private String host; private int port; private RpcResponse response; private final Object obj = new Object(); public RpcClient(String host, int port) { this.host = host; this.port = port; } /** * 链接服务端,发送消息 */ public RpcResponse send(RpcRequest request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { // 向pipeline中添加编码、解码、业务处理的handler channel.pipeline() .addLast(new RpcEncoder(RpcRequest.class)) //out-1 .addLast(new RpcDecoder(RpcResponse.class)) //in-1 .addLast(RpcClient.this); //in-2 } }).option(ChannelOption.SO_KEEPALIVE, true); // 链接服务器 ChannelFuture future = bootstrap.connect(host, port).sync(); //将request对象写入outbundle处理后发出(即RpcEncoder编码器) future.channel().writeAndFlush(request).sync(); // 用线程等待的方式决定是否关闭连接 // 其意义是:先在此阻塞,等待获取到服务端的返回后,被唤醒,从而关闭网络连接 synchronized (obj) { obj.wait(); } if (response != null) { future.channel().closeFuture().sync(); } return response; } finally { group.shutdownGracefully(); } } /** * 读取服务端的返回结果 */ @Override public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { this.response = response; synchronized (obj) { obj.notifyAll(); } } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("client caught exception", cause); ctx.close(); } }
[code]/** * RPC 代理(用于创建 RPC 服务代理) */ public class RpcProxy { //服务地址 private String serverAddress; //自动加载,查找服务 private ServiceDiscovery serviceDiscovery; public RpcProxy(String serverAddress) { this.serverAddress = serverAddress; } public RpcProxy(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } /** * 创建代理 * * @param interfaceClass * @return */ @SuppressWarnings("unchecked") public <T> T create(Class<?> interfaceClass) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //创建RpcRequest,封装被代理类的属性 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); //拿到声明这个方法的业务接口名称 request.setClassName(method.getDeclaringClass() .getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); //查找服务 if (serviceDiscovery != null) { serverAddress = serviceDiscovery.discover(); } //随机获取服务的地址 String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); //创建Netty实现的RpcClient,链接服务端 RpcClient client = new RpcClient(host, port); //通过netty向服务端发送请求 RpcResponse response = client.send(request); //返回信息 if (response.isError()) { throw response.getError(); } else { return response.getResult(); } } }); } }
服务端:
rpc-server
[code]/** * 处理具体的业务调用 * 通过构造时传入的“业务接口及实现”handlerMap,来调用客户端所请求的业务方法 * 并将业务方法返回值封装成response对象写入下一个handler(即编码handler——RpcEncoder) */ public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger logger = LoggerFactory .getLogger(RpcHandler.class); private final Map<String, Object> handlerMap; RpcHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } /** * 接收消息,处理消息,返回结果, */ @Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { //根据request来处理具体的业务调用 Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t); } //写入 outbundle(即RpcEncoder)进行下一步处理(即编码)后发送到channel中给客户端 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** * 根据request来处理具体的业务调用 * 调用是通过反射的方式来完成 */ private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); //拿到实现类对象 Object serviceBean = handlerMap.get(className); //拿到要调用的方法名、参数类型、参数值 String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //拿到接口类 Class<?> forName = Class.forName(className); System.out.println(serviceBean.toString()+" "+ Arrays.toString(parameters)); //调用实现类对象的指定方法并返回结果 Method method = forName.getMethod(methodName, parameterTypes); return method.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(cause.getMessage()); ctx.close(); } }
[code]/** * 框架的RPC 服务器(用于将用户系统的业务类发布为 RPC 服务) * 使用时可由用户通过spring-bean的方式注入到用户的业务系统中 * 由于本类实现了ApplicationContextAware InitializingBean * spring构造本对象时会调用setApplicationContext()方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现 * 还会调用afterPropertiesSet()方法,在方法中启动netty服务器 * 顺序 * 1.setApplicationContext 报错会中间截断 * 2.afterPropertiesSet */ public class RpcServer implements ApplicationContextAware, InitializingBean { private static final Logger logger = LoggerFactory .getLogger(RpcServer.class); private String serverAddress; private ServiceRegistry serviceRegistry; //用于存储业务接口和实现类的实例对象(由spring所构造) private Map<String, Object> handlerMap = new HashMap<String, Object>(); public RpcServer(String serverAddress) { this.serverAddress = serverAddress; } //服务器绑定的地址和端口由spring在构造本类时从配置文件中传入 public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) { this.serverAddress = serverAddress; //用于向zookeeper注册名称服务的工具类 this.serviceRegistry = serviceRegistry; } /** * 通过注解,获取标注了rpc服务注解的业务类的----接口及impl对象,将它放到handlerMap中 */ public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx .getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { //从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名 String interfaceName = serviceBean.getClass() .getAnnotation(RpcService.class).value().getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * 在此启动netty服务,绑定handle流水线: * 1、接收请求数据进行反序列化得到request对象 * 2、根据request中的参数,让RpcHandler从handlerMap中找到对应的业务imple,调用指定方法,获取返回结果 * 3、将业务调用结果封装到response并序列化后发往客户端 */ public void afterPropertiesSet() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new RpcDecoder(RpcRequest.class))// in-1 .addLast(new RpcEncoder(RpcResponse.class))// out-1 .addLast(new RpcHandler(handlerMap));// in-2 } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); ChannelFuture future = bootstrap.bind(host, port).sync(); logger.debug("server started on port {}", port); if (serviceRegistry != null) { serviceRegistry.register(serverAddress); } future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
[code]/** * RPC 请求注解(标注在服务实现类上) */ @Target({ ElementType.TYPE })//注解用在接口上 @Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息 @Component public @interface RpcService { Class<?> value(); }
运行结果:
zkCli数据:
服务端
sample—server调用服务端接口实现:
Hello! World
客户端
服务端返回结果:
Hello! World
代码运行路径:
代码下载
地址:https://download.csdn.net/download/qq_19968255/10696211
相关文章推荐
- RPC/XDR/NFS系列之----远程过程调用
- thrift系列 - harpc 基于thrift的轻量级rpc框架
- RPC框架系列——Avro
- 发布Exchange的RPC以及RPC Over HTTPS:ISA2006系列之十九
- 高并发架构系列:如何从0到1设计一个类Dubbo的RPC框架
- RPC框架系列——Avro
- 自己动手写Rpc框架系列
- 新浪微博新兵训练营系列课程——平台RPC框架介绍
- [转]新兵训练营系列课程——平台RPC框架介绍
- Hadoop HDFS编程 API入门系列之RPC版本2(九)
- Dubbo系列之RPC分析(七)
- RPC框架系列——Avro
- RPC框架原理及从零实现系列文章(四):支持zookeeper注册中心与负载均衡
- RPC/XDR/NFS系列之----远程过程调用
- Dubbo系列(2)_RPC介绍
- java RPC系列之一 rmi
- HadoopRPC机制分析系列之一: 动态代理
- Windows 安全系列04-AD域的RPC安全问题
- dubbo系列----rpc初探
- 发布Exchange的RPC以及RPC Over HTTPS:ISA2006系列之十九