一个分布式rpc框架的实现方案(二)
2016-07-19 11:29
597 查看
之前实现了一个点对点的rpc功能框架,使用简单的协议将调用接口和参数传给服务端。由于最近在看zk相关的内容,于是准备引入zk进行分布式管理。
先看一下前一个简单版本的实现原理:
这是一个简单的点对点的远程调用实现方案, client绑定了server的ip地址。那么如果有多个server,并且server的变化情况要及时通知client应该怎么实现? 引入了zookeeper后这个问题就比较容易解决了。以下两张图表达了基于zk的分布式rpc设计思想。
分布式服务的改造比较简单,将前一个版本的中写死的ip地址改成从zk中查询服务地址,根据负载均衡策略选择一个server的ip, 请求这个ip对应的服务就ok了。使用zookeeper管理server集群的方式如下:
服务方在zookeeper上创建一个临时节点,节点的名称是服务器的ip。客户端读zookeeper同目录下面的子节点列表,拿到server的ip地址,再选择一个ip作为请求方调用服务。如果服务器挂掉了,由于创建的是临时节点,因此zookeeper也会同步删掉这个ZNode,所以客户端就不会再看到对应的服务ip了。
具体代码如下:
1. 实现一个zk 的监控者
服务端注册服务,先创建/rpcServer目录
服务provider在init时将自己ip注册到zk集群上
rpc客户端查询zk中的服务节点,选择一个作为请求地址
socket client实现
其他的代码详见之前版本。
我们看一下调用效果:启动3虚拟机+2个本地zk 服务端,部署分布式zk环境。 将rpc provider代码打成jar包传到3个虚拟机中, 运营服务程序。
服务1
服务2
服务3
调出zkclient,查看一下zk集群的node情况。
OK,服务全部注册成功。 本地运行rpc client,
Client: 调用到ip 130的机器上
130 server:
再次运行Client: 这次调用到了ip 128的机器上。
128 server:
未完待续,下次将解决更加细节的问题,例如线程池的维护和NIO。
先看一下前一个简单版本的实现原理:
这是一个简单的点对点的远程调用实现方案, client绑定了server的ip地址。那么如果有多个server,并且server的变化情况要及时通知client应该怎么实现? 引入了zookeeper后这个问题就比较容易解决了。以下两张图表达了基于zk的分布式rpc设计思想。
分布式服务的改造比较简单,将前一个版本的中写死的ip地址改成从zk中查询服务地址,根据负载均衡策略选择一个server的ip, 请求这个ip对应的服务就ok了。使用zookeeper管理server集群的方式如下:
服务方在zookeeper上创建一个临时节点,节点的名称是服务器的ip。客户端读zookeeper同目录下面的子节点列表,拿到server的ip地址,再选择一个ip作为请求方调用服务。如果服务器挂掉了,由于创建的是临时节点,因此zookeeper也会同步删掉这个ZNode,所以客户端就不会再看到对应的服务ip了。
具体代码如下:
1. 实现一个zk 的监控者
public class ZKWatcher implements Watcher { protected static final int SESSION_TIME = 2000; protected ZooKeeper zooKeeper; protected String SERVER_PATH = "/rpcServer"; public void init() throws IOException, KeeperException, InterruptedException{} public ZKWatcher(){ try { init(); } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } // 防止链接未完成就创建节点 protected CountDownLatch connectedSemaphore = new CountDownLatch(1); public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { // 链接成功 connectedSemaphore.countDown(); } } SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); public boolean createEphemeralFile(String file){ boolean result = false; try { String f = SERVER_PATH + "/" + file; Stat stat = zooKeeper.exists(f, false); if (stat == null) { zNode = zooKeeper.create(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); }else{ zooKeeper.setData(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(), -1); } result = true; }catch (Exception e){ System.out.println("error to create zk server file"); e.printStackTrace(); } return result; } }
服务端注册服务,先创建/rpcServer目录
public class ZKServer extends ZKWatcher { /** * 注册服务端 * @param ip * @return */ public boolean registServer(String ip) { boolean result = false; try{ if(StringUtils.isEmpty(ip)){ ip = getLocalIp(); } result = this.createEphemeralFile(ip); }catch (Exception e){ } if (result){ System.out.println("success to regist rpc server:" + ip); }else{ System.out.println("fail to regist rpc server:" + ip); } return result; } public void init() throws IOException, KeeperException, InterruptedException { if (zooKeeper == null){ zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this); connectedSemaphore.await(); } Stat stat = zooKeeper.exists(SERVER_PATH, false); if (stat != null) { return; } String zNode = zooKeeper.create(SERVER_PATH, ("init:" + getLocalIp()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("create server path:" + zNode); } }
服务provider在init时将自己ip注册到zk集群上
public class SocketProviderV2{ …… public Object provide() throws Throwable { ServerSocket serverSocket = new ServerSocket(dataSource.getPort()); // 注册zk服务器 zkServer.registServer(null); while (true) { Socket socket = null; try { // open socket socket = serverSocket.accept(); this.getRpcRequest(socket); // 拿到请求后续处理 …… } catch (Exception e) { e.printStackTrace(); } } }
rpc客户端查询zk中的服务节点,选择一个作为请求地址
public class ZKClient extends ZKWatcher { public List<String> getServerIps() throws KeeperException, InterruptedException { List<String> ips = new ArrayList<String>(); try{ ips = zooKeeper.getChildren(SERVER_PATH, false); }catch (Exception e){ e.printStackTrace(); } return ips; } // 随机数实现负载均衡 Random random = new Random(1000); public String getBalanceIp() throws KeeperException, InterruptedException { List<String> ips = this.getServerIps(); int size = ips.size(); int index = (int)(Math.random() * size); return ips.get(index); } public void init() throws IOException, KeeperException, InterruptedException { if (zooKeeper == null){ zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this); connectedSemaphore.await(); } System.out.println("I finished init"); } }
socket client实现
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { StringBuffer stream = this.buildRpcRequest(proxy, method, args); Object object = null; try{ long startTime = System.currentTimeMillis(); System.out.println("New rpc client send " + stream.toString() + " time:" + startTime); String serverIp = zkClient.getBalanceIp(); System.out.println("server ip is " + serverIp); // 调用远程服务器数据 …… }catch (IOException e){ e.printStackTrace(); object = e.toString(); } return object; }
其他的代码详见之前版本。
我们看一下调用效果:启动3虚拟机+2个本地zk 服务端,部署分布式zk环境。 将rpc provider代码打成jar包传到3个虚拟机中, 运营服务程序。
服务1
root@master:/home/caisheng# java -jar lengren-rpc.jar begin start service v2 success to regist rpc server:192.168.226.128
服务2
root@slave01:/home/caisheng# java -jar lengren-rpc.jar begin start service v2 success to regist rpc server:192.168.226.129
服务3
root@slave02:/home/caisheng# java -jar lengren-rpc.jar begin start service v2 success to regist rpc server:192.168.226.130
调出zkclient,查看一下zk集群的node情况。
OK,服务全部注册成功。 本地运行rpc client,
Client: 调用到ip 130的机器上
I finished init New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467282617593 server ip is 192.168.226.130 client read from service:success aop, result=i am real printtest rpc time:14 success aop, result=i am real printtest rpc
130 server:
再次运行Client: 这次调用到了ip 128的机器上。
I finished init New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467280709114 server ip is 192.168.226.128 client read from service:success aop, result=i am real printtest rpc time:17 success aop, result=i am real printtest rpc
128 server:
未完待续,下次将解决更加细节的问题,例如线程池的维护和NIO。
相关文章推荐
- 最好的规划就是天衣无缝,让人无机可乘
- jms之activeMQ与spring集成进阶-实现一种负载均衡
- word里的字体大小 怎样 转换成 css 中的 字体 像素
- Xcode buildsettings 设置相关的问题
- HTTP请求头大全
- Spring4.x官方参考文档中文版——第21章 Web MVC框架(21)
- Linux SVN 创建项目
- 五种服务器网络模型
- 一个简单的rpc框架实现(一)
- Android studio value 2 (DexIndexOverflowException,OutOfMemoryError,NoClassDefFoundError错误)
- iOS开发之打包测试包
- 教你如何迅速秒杀掉:99%的海量数据处理面试题
- 二叉树非递归遍历
- Java初始化顺序
- Android 6.0 运行时权限处理完全解析
- String 学习
- ClassCastException Log4jLoggerFactory LoggerContex
- 那海蓝蓝 微博
- android studio 插件开发(自动生成框架代码插件)
- <meta http-equiv="pragma" content="no-cache"/>是什么意思?