一个简单的rpc框架实现(一)
2016-07-19 11:28
746 查看
系统长大后为了保证可维护性,拆分系统是一个比较常见的解决方案。系统拆分后,原来的接口直接调用方法不再可行,需要被替换成为远程调用过程。远程调用可以直接使用http协议post 一个请求到服务提供端,然后服务提供端返回一个结果给调用者。这种方案将原本数据service层的DO操作过程上升成为了web服务,我个人并不反感。第二种方案就是使用rmi 实现,但是rmi client 和server的地址耦合到一起,一旦server更换地址client端需要同步修改。
第三种方案是直接读对方的数据库,当然便利性和可维护性更差,需要把server 端的底层DAO业务冗余到client。 最后一种就是rpc 远程调用框架。
rpc框架需要很多组件:调用者,提供者,服务注册中心, 通信总线 和通信协议。 其中每一个组件都有很多技术点要谈。本文在这里只讨论一个最简单的原型:简单分隔符的通信协议,使用socket 实现通信总线。 CS直接耦合绑定在一起,后期再考虑注册中心和服务暴漏的问题。 Socket通信方式也采用最简单的直连接方式,不使用nio, 也不维护连接池。
首先先看一个简单场景:
打印接口:
接口实现:
业务调用方:
其中图1能够很好的描述这个场景, 虚线A表示业务方并不知道实现者是谁,只知道接口的存在。
图1
现在业务系统升级了,负责打印的bean复杂到独立成了一个系统,要从业务系统中剥离出去,该怎么办?我们选择使用socket实现远程调用。 由于控制反转的设计,业务方几乎不需要改动任何代码就能实现升级。远程调用方案如图2所示:
图2
接口PrintText的实现被代理成为socket client, 将调用请求通过socket发送出去。 服务提供者接受到请求后,解析完毕,调用具体的实现类SystemPrintText, 然后将返回值发送回socket client。 最后返回给业务方。
1.通讯协议:
使用|##|隔开字段(并不推荐这种方式,后面系列进一步讲协议的设计)
2. Client客户端:
动态代理,截获业务方的接口调用后将调用参数组装成为上述协议,发送给服务端,并将服务端的返回结果返回给业务方。代码实现如下:
对于动态代理InvocationHander的介绍这里不再论述。
3. 服务端实现:
服务端监听端口,获取客户端访问。 校验数据有效性:是否为空,加密参数,版本号,调用接口,调用方法。 然后通过spring容器找到最终的实现bean,通过反射的方式调用对应方法。 最后将返回值发送给客户端。代码如下:
4.中转bean:
通过spring的FactoryBean实现,通过getObject方法能够中转任何接口的调用请求。具体细节请参考spring AOP和动态代理实现方案。Demo版本就非常简单,就是将业务的请求发送给2中的动态代理。 代码如下:
5.RemoteDataSource:
远程数据源,耦合了客户端和服务端的ip地址、加密信息以及版本号。代码如下:
6.remotionFactory:
一个简单工厂,生成以PrintText为接口,2中的SocketConsumerProxy的为实现的代理对象给业务方。具体代码如下:
7.服务端:
需要在spring容器加载bean时提供服务,代码如下:
8.Spring中bean配置:
客户端:
服务端:
9.业务方代码:
一个简单的rpc框架就实现完毕
服务方未启动:socket 调用异常
服务方启动:
服务方日志:
客户端日志:用时27ms, 很多地方需要优化。
第三种方案是直接读对方的数据库,当然便利性和可维护性更差,需要把server 端的底层DAO业务冗余到client。 最后一种就是rpc 远程调用框架。
rpc框架需要很多组件:调用者,提供者,服务注册中心, 通信总线 和通信协议。 其中每一个组件都有很多技术点要谈。本文在这里只讨论一个最简单的原型:简单分隔符的通信协议,使用socket 实现通信总线。 CS直接耦合绑定在一起,后期再考虑注册中心和服务暴漏的问题。 Socket通信方式也采用最简单的直连接方式,不使用nio, 也不维护连接池。
首先先看一个简单场景:
打印接口:
public interface PrintText { String print(String text); }
接口实现:
public class SystemPrint implements PrintText { public String print(String text) { System.out.println(text); return "系统已打印:" + text; } }
业务调用方:
public class SpringClient { public static void main(String[] args){ BeanFactory apx = new ClassPathXmlApplicationContext("classpath:spring-config.xml"); PrintText pt = (PrintText) apx.getBean("printText"); System.out.println(pt.print("springClient")); } }
其中图1能够很好的描述这个场景, 虚线A表示业务方并不知道实现者是谁,只知道接口的存在。
图1
现在业务系统升级了,负责打印的bean复杂到独立成了一个系统,要从业务系统中剥离出去,该怎么办?我们选择使用socket实现远程调用。 由于控制反转的设计,业务方几乎不需要改动任何代码就能实现升级。远程调用方案如图2所示:
图2
接口PrintText的实现被代理成为socket client, 将调用请求通过socket发送出去。 服务提供者接受到请求后,解析完毕,调用具体的实现类SystemPrintText, 然后将返回值发送回socket client。 最后返回给业务方。
下面详细讲一下各个组件的实现方案:
1.通讯协议:使用|##|隔开字段(并不推荐这种方式,后面系列进一步讲协议的设计)
协议设计: version:版本号|##|cypher:加密串|##| interfaceName:接口bean名称|##| interface:接口全路径|##|method:调用方法|##|params:参数1的类+参数1对象,单数2类+参数2对象, Demo: `version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+演示远程调用`
2. Client客户端:
动态代理,截获业务方的接口调用后将调用参数组装成为上述协议,发送给服务端,并将服务端的返回结果返回给业务方。代码实现如下:
public class SocketConsumerProxy implements InvocationHandler { private Object target; private RemoteDataSource dataSource; public SocketConsumerProxy(RemoteDataSource dataSource){ this.dataSource = dataSource; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { StringBuffer stream = this.buildRpcRequest(proxy, method, args); Object object = null; try{ long startTime = System.currentTimeMillis(); System.out.println("New rpc client send " + stream.toString() + " time:" + startTime); // socket connect Socket socket=new Socket(dataSource.getIp(), dataSource.getPort()); // request PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println(stream.toString()); os.flush(); // read response BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream())); object = br.readLine(); long endTime = System.currentTimeMillis(); System.out.println("client read from service:" + object + " time:" + (endTime - startTime)); }catch (IOException e){ e.printStackTrace(); object = e.toString(); } return object; } private StringBuffer buildRpcRequest(Object proxy, Method method, Object[] args) { StringBuffer buffer = new StringBuffer(); buffer.append(String.format("version:%s|##|cypher:%s|##|interfaceName:%s|##|interface:%s|##|method:%s|##|params:" , dataSource.getVersion(), dataSource.getCypher(), dataSource.getInterfaceName(), dataSource.getInterfaces(), method.getName())); for (Object obj : args){ buffer.append(obj.getClass().getName() + "+" + obj.toString() + ","); } return buffer; } }
对于动态代理InvocationHander的介绍这里不再论述。
3. 服务端实现:
服务端监听端口,获取客户端访问。 校验数据有效性:是否为空,加密参数,版本号,调用接口,调用方法。 然后通过spring容器找到最终的实现bean,通过反射的方式调用对应方法。 最后将返回值发送给客户端。代码如下:
public class SocketProvider{ private RemoteDataSource dataSource; public SocketProvider(RemoteDataSource dataSource) { this.dataSource = dataSource; } public Object provide() throws Throwable { ServerSocket serverSocket = new ServerSocket(dataSource.getPort()); while (true) { Socket socket = null; try { //接收客户连接,只要客户进行了连接,就会触发accept();从而建立连接 socket = serverSocket.accept(); this.getRpcRequest(socket); } catch (Exception e) { e.printStackTrace(); } } } private void getRpcRequest(Socket socket) { try { System.out.println("rpc client accepted " + socket.getInetAddress() + ":" + socket.getPort() + " time:" + System.currentTimeMillis()); // 接收服务器的反馈 BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream())); String msg = br.readLine(); System.out.println("读到远程调用请求:" + msg); Object obj = this.parseRpcClientRequest(msg); // 接收服务器的反馈 PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println(obj.toString()); os.flush(); System.out.println("rpc server return " + obj + ":" + socket.getPort() + " time:" + System.currentTimeMillis()); } catch (IOException e) { e.printStackTrace(); } } public Object parseRpcClientRequest(String msg) { Object result = ""; if (StringUtils.isEmpty(msg)){ return result; } String[] infos = msg.split("\\|##\\|"); Map<String, String> infoMap = new HashMap<String, String>(); for(String info : infos){ String[] pair = info.split(":"); infoMap.put(pair[0], pair[1]); } if (infoMap.isEmpty()){ return "无调用参数"; } if (!dataSource.getCypher().equals(infoMap.get("cypher"))){ return "加密串不对"; } if (!dataSource.getVersion().equals(infoMap.get("version"))){ return "服务版本号不对"; } String interfaces = infoMap.get("interface"); String interfaceName = infoMap.get("interfaceName"); String methodName = infoMap.get("method"); String params = infoMap.get("params"); if(StringUtils.isEmpty(interfaces) || StringUtils.isEmpty(methodName) || StringUtils.isEmpty(interfaceName)){ return "无调用接口或者方法 或者接口beanName"; } // bean Object obj = RemoteDataSource.beanFactory.getBean(interfaceName); if(obj == null){ return "未找到对应的服务"; } // bean 和 interface对应关系 boolean isInterfaceRight = false; Class<?>[] clazzArray = obj.getClass().getInterfaces(); for (Class<?> clazz : clazzArray){ if (clazz.getName().equals(interfaces)){ isInterfaceRight = true; break; } } if (isInterfaceRight == false){ return "错误的bean Name 和 interface对应关系"; } // 参数对应的类和对象 String[] paramsArray = params.split(","); Class<?>[] paramsClazzArray = new Class<?>[paramsArray.length]; Object[] paramObjArray = new Object[paramsArray.length]; try{ for (int i = 0; i < paramsArray.length; i ++){ String paramInfo = paramsArray[i]; if (StringUtils.isEmpty(paramInfo)) { // 过滤掉多余的逗号 continue; } String[] paramsInfos = paramInfo.split("\\+"); Class c = Class.forName(paramsInfos[0]); paramsClazzArray[i] = c; paramObjArray[i] = paramsInfos[1]; } }catch (ClassNotFoundException e){ e.printStackTrace(); return "未找到参数对应的类名:" +e.toString(); } try { Method method = obj.getClass().getMethod(methodName, paramsClazzArray); result = method.invoke(obj, paramObjArray); } catch (NoSuchMethodException e) { e.printStackTrace(); return "未找到参数对应的方法名称:" +e.toString(); } catch (IllegalAccessException e) { e.printStackTrace(); return "无效的调用:" +e.toString(); } catch (InvocationTargetException e) { e.printStackTrace(); return "无效的目标地址:" +e.toString(); } result = "我是服务器代理aop, result=" + result; return result; } }
4.中转bean:
通过spring的FactoryBean实现,通过getObject方法能够中转任何接口的调用请求。具体细节请参考spring AOP和动态代理实现方案。Demo版本就非常简单,就是将业务的请求发送给2中的动态代理。 代码如下:
public class CaishengRemoteSimpleConsumerBean implements FactoryBean, InitializingBean{ private RemoteDataSource remoteDataSource = new RemoteDataSource(); // 远程数据源 public void afterPropertiesSet() throws Exception { remoteDataSource.init(); } public Object getObject() throws Exception { return remoteDataSource.getClientObject(); } public Class<?> getObjectType() { return remoteDataSource.getClientObject().getClass(); } ……//一些不重要的get,set操作 }
5.RemoteDataSource:
远程数据源,耦合了客户端和服务端的ip地址、加密信息以及版本号。代码如下:
public class RemoteDataSource { private String ip = "127.0.0.1"; private int port = 9090; // 端口 private String interfaces; // 接口全路径 private String interfaceName; // 接口名 private String version; private String cypher ="default"; // 密码 private Class<?> interfaceClass; // 类文件 private SocketRemotionFactory remotionFactory; public void init(){ remotionFactory = new SocketRemotionFactory(this); try { interfaceClass = Class.forName(interfaces); } catch (ClassNotFoundException e) { } } public Object getClientObject(){ return remotionFactory.getRemoteClientProxy(); } ……//一些不重要的get,set操作 }
6.remotionFactory:
一个简单工厂,生成以PrintText为接口,2中的SocketConsumerProxy的为实现的代理对象给业务方。具体代码如下:
public class SocketRemotionFactory { private RemoteDataSource dataSource; public SocketRemotionFactory(RemoteDataSource dataSource) { this.dataSource = dataSource; } public Object getRemoteClientProxy(){ Object result = null; Class<?> clazz = dataSource.getInterfaceClass(); if (clazz == null){ return "错误的client 代理,无对应的class"; } Class<?>[] clazzArray = new Class[1]; clazzArray[0] = clazz; try{ result = Proxy.newProxyInstance(clazz.getClassLoader(), clazzArray, new SocketConsumerProxy(dataSource)); }catch (Exception e){ e.printStackTrace(); } return result; } }
7.服务端:
需要在spring容器加载bean时提供服务,代码如下:
public class CaishengRemoteSimpleProviderBean implements FactoryBean, InitializingBean{ private RemoteDataSource remoteDataSource = new RemoteDataSource(); // 远程数据源 private SocketProvider provider; public void init() throws Exception { this.afterPropertiesSet(); } public void afterPropertiesSet() throws Exception { remoteDataSource.init(); provider = new SocketProvider(remoteDataSource); } public Object getObject() throws Exception { try { provider.provide(); } catch (Throwable throwable) { throwable.printStackTrace(); } return null; } ……//一些不重要的get,set操作 }
8.Spring中bean配置:
客户端:
<bean id="caishengSimpleRemoteProxyPrintText" class="com.tmall.proxy.remotesimple.CaishengRemoteSimpleConsumerBean"> <property name="interfaces"> <value>com.tmall.beans.PrintText</value> </property> <property name="interfaceName"> <value>proxyPrintText</value> </property> <property name="version"> <value>1.0.0</value> </property> </bean>
服务端:
<bean id="printText" class="com.tmall.beans.impl.SystemPrint"></bean> <bean id="caishengSimpleRemoteProvider" class="com.tmall.proxy.remotesimple.CaishengRemoteSimpleProviderBean" init-method="init"> <property name="interfaces"> <value>com.tmall.beans.PrintText</value> </property> <property name="interfaceName"> <value>proxyPrintText</value> </property> <property name="version"> <value>1.0.0</value> </property> </bean>
9.业务方代码:
public class Client { public static void main(String[] args){ BeanFactory apx = new ClassPathXmlApplicationContext("classpath:spring-config.xml"); PrintText pt = (PrintText) apx.getBean("caishengSimpleRemoteProxyPrintText"); System.out.println(pt.print("远程调用演示")); } }
一个简单的rpc框架就实现完毕
服务方未启动:socket 调用异常
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+远程调用演示, time:1464108758488 java.net.ConnectException: Connection refused: connect java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.connect0(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:69) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:157) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at com.tmall.proxy.remotesimple.socket.SocketConsumerProxy.invoke(SocketConsumerProxy.java:33) at com.sun.proxy.$Proxy0.print(Unknown Source) at com.tmall.client.simpleRemote.Client.main(Client.java:16) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Process finished with exit code 0
服务方启动:
rpc client accepted /127.0.0.1:54730 time:1464108803359 读到远程调用请求:version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+远程调用演示, 远程调用演示 rpc server return 我是服务器代理aop, result=系统已打印:远程调用演示:54730 time:1464108803380
服务方日志:
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.Print Text|##|method:print|##|params:java.lang.String+远程调用演示, time:1464108803353 client read from service:我是服务器代理aop, result=系统已打印:远程调用演示 time:27 我是服务器代理aop, result=系统已打印:远程调用演示 Process finished with exit code 0
客户端日志:用时27ms, 很多地方需要优化。
相关文章推荐
- Android studio value 2 (DexIndexOverflowException,OutOfMemoryError,NoClassDefFoundError错误)
- iOS开发之打包测试包
- 教你如何迅速秒杀掉:99%的海量数据处理面试题
- 二叉树非递归遍历
- Java初始化顺序
- Android 6.0 运行时权限处理完全解析
- String 学习
- ClassCastException Log4jLoggerFactory LoggerContex
- 那海蓝蓝 微博
- android studio 插件开发(自动生成框架代码插件)
- <meta http-equiv="pragma" content="no-cache"/>是什么意思?
- iOS 动画队列-仿映客刷礼物效果
- Oozie安装总结
- 使用Glide加载gif
- kafka本地单机安装部署
- Android静态安全检测 -> PendingIntent误用风险
- 混淆
- 《linux学习》之NFS的测试
- 白菜之hashCode()方法重写及不同数据类型调用hashCode的方法
- android的通过context对象读取私有文件