利用Thrift和zk简单实现服务治理框架中的订阅发布机制
2017-01-06 10:59
756 查看
本文简单介绍下利用Thrift和zk简单实现服务治理框架服务的订阅发布机制,类似于Dubbo的服务治理。这个只是简单版本,只供学习和理解用。
全部代码下载:Github链接:github链接,点击惊喜;写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github;多谢;
1.服务越来越多时,服务URL配置管理变得非常困难。
2.服务间依赖关系变得错踪复杂
3.服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?
4…….等等
为了满足服务线下管控、保障线上高效运行,需要有一个统一的服务治理框架对服务进行统一、有效管控,保障服务的高效、健康运行。服务治理是分布式服务框架的一个可选特性,尽管从服务开发和运行角度看它不是必须的,但是如果没有服务治理功能,分布式服务框架的服务SLA很难得到保障,服务化也很难真正实施成功。
基于以上原因,需要对各个服务做治理,这也是就为什么有了dubbo这类服务治理框架,它与其他RPC框架相比(例如thrift,avro),不仅仅提供了透明的服务调用,而且还提供了服务治理,比如上述的调用统计管理、负载均衡,这样每个业务模块只需专注于自己的内部业务逻辑即可。
简单画了如下图:
2.Thrift提供RPC调用功能。具体介绍请参考我的博客:Apache Thrift入门学习
2.利用Thrift创建服务和启动服务
3.利用Zookeeper去对应目录/Service订阅相应服务获得接口的IP地址和端口号,并注册监听事件,当目录改变时更新接口的IP地址和端口号—订阅服务
2.实现相应的接口。
3.编写服务启动和注册服务类。
4.编写相应的客户端订阅服务。
具体实现原理,见注释:
1.IDL文件和编译:
2.实现相应接口:
3.实现服务启动和注册类
4.编写客户端类:
所有代码下载请见github,上面的链接。
2.启动ThriftClinet类
3.测试结果:
服务端:
客户端:
本文来自伊豚(blog.wpeace.cn)
全部代码下载:Github链接:github链接,点击惊喜;写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github;多谢;
1.什么是服务治理:
1.1微服务简单介绍:
微服务已经成为当下最热门的话题之一。它是一种新的架构风格,涉及组织架构、设计、交付、运维等方面的变革,核心目标是为了解决系统的交付周期,并降低维护成本和研发成本。相比传统的SOA架构或者单块架构,微服务有很多的优势,比如技术的多样性、模块化、独立部署等,但也带来了相应的成本,比如运维成本、服务管理成本等。1.2服务治理的出现
在微服务盛行下,利用RMI或Hessian等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用已经变得越来越不能满足需求。1.服务越来越多时,服务URL配置管理变得非常困难。
2.服务间依赖关系变得错踪复杂
3.服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?
4…….等等
为了满足服务线下管控、保障线上高效运行,需要有一个统一的服务治理框架对服务进行统一、有效管控,保障服务的高效、健康运行。服务治理是分布式服务框架的一个可选特性,尽管从服务开发和运行角度看它不是必须的,但是如果没有服务治理功能,分布式服务框架的服务SLA很难得到保障,服务化也很难真正实施成功。
基于以上原因,需要对各个服务做治理,这也是就为什么有了dubbo这类服务治理框架,它与其他RPC框架相比(例如thrift,avro),不仅仅提供了透明的服务调用,而且还提供了服务治理,比如上述的调用统计管理、负载均衡,这样每个业务模块只需专注于自己的内部业务逻辑即可。
1.3服务治理的几个要素:
服务管理组件:这个组件是“服务治理”的核心组件,您的服务治理框架有多强大,主要取决于您的服务管理组件功能有多强大。它至少具有的功能包括:服务注册管理、访问路由;另外,它还可以具有:服务版本管理、服务优先级管理、访问权限管理、请求数量限制、连通性管理、注册服务集群、节点容错、事件订阅-发布、状态监控,等等功能。 服务提供者(服务生产者):即服务的具体实现,然后按照服务治理框架特定的规范发布到服务管理组件中。这意味着什么呢?这意味着,服务提供者不一定按照RPC调用的方式发布服务,而是按照整个服务治理框架所规定的方式进行发布(如果服务治理框架要求服务提供者以RPC调用的形式进行发布,那么服务提供者就必须以RPC调用的形式进行发布;如果服务治理框架要求服务提供者以Http接口的形式进行发布,那么服务提供者就必须以Http接口的形式进行发布,但后者这种情况一般不会出现)。 服务使用者(服务消费者):即调用这个服务的用户,调用者首先到服务管理组件中查询具体的服务所在的位置;服务管理组件收到查询请求后,将向它返回具体的服务所在位置(视服务管理组件功能的不同,还有可能进行这些计算:判断服务调用者是否有权限进行调用、是否需要生成认证标记、是否需要重新检查服务提供者的状态、让调用者使用哪一个服务版本等等)。服务调用者在收到具体的服务位置后,向服务提供者发起正式请求,并且返回相应的结果。第二次调用时,服务请求者就可以像服务提供者直接发起调用请求了(当然,您可以有一个服务提供期限的设置,使用租约协议就可以很好的实现)。 参考于: http://blog.csdn.net/yinwenjie/article/details/49869535
简单画了如下图:
1.4服务的订阅发布机制
它的核心理念是实现服务消费者和服务提供者的解耦,让服务消费者能够像使用本地接口一样消费远端的服务提供者,而不需要关心服务提供者的位置信息,实现透明化调用。常用的服务注册中心有Zookeeper、ETCD,以及基于数据库的配置中心。2.设计一个服务治理框架中的订阅发布机制
2.1使用的技术:
1.Zookeeper作为注册中心并进行管控。具体介绍请参考我的博客: Hadoop集群之 ZooKeeper和Hbase环境搭建2.Thrift提供RPC调用功能。具体介绍请参考我的博客:Apache Thrift入门学习
2.2设计思路
1.利用Zookeeper建立/Service根目录,在该目录下建立相应的服务接口子目录存放该接口的IP地址和端口号—注册服务2.利用Thrift创建服务和启动服务
3.利用Zookeeper去对应目录/Service订阅相应服务获得接口的IP地址和端口号,并注册监听事件,当目录改变时更新接口的IP地址和端口号—订阅服务
3.实现订阅发布机制
3.1实现步骤:
1.编写Thrift的IDL并编译出相应的接口类。2.实现相应的接口。
3.编写服务启动和注册服务类。
4.编写相应的客户端订阅服务。
3.2代码实现
工程为maven工程,假如不建立maven工程,请下载对应的lib包。具体实现原理,见注释:
1.IDL文件和编译:
//命名空间定义:java包 namespace java cn.wpeace.thrift //结构体定义:转化java中的实体类 struct Request{ 1:required string userName; 2:required string password; } //定义返回类型 struct Student{ 1:required string naem; 2:required i32 age; } struct People{ 1:required string naem; 2:required i32 age; 3:required string sex; } //异常描述定义 exception HelloException{ 1:required string msg; } //服务定义,生成接口用 service StudentService{ list<Student> getAllStudent(1:Request request)throws (1:HelloException e); } //服务定义,生成接口用 service PeopleService{ list<People> getAllPeople(1:Request request)throws (1:HelloException e); } //thrift -gen java ./zk.thrift
2.实现相应接口:
public class StudentServiceImpl implements Iface {// 实现的是StudentService类下面的接口 @Override public List<Student> getAllStudent(Request request) throws HelloException, TException { System.out.println("调用studentService"); System.out.println(request.getUserName()); System.out.println(request.getPassword()); List<Student> students = new ArrayList<>(); for (int i = 0; i < 5; i++) { Student student = new Student(); student.setNaem("peace" + i); student.setAge(22 + i); students.add(student); } return students; } } public class PeopleServiceImpl implements Iface{ @Override public List<People> getAllPeople(Request request) throws HelloException, TException { System.out.println("调用PeopleService"); System.out.println(request.getUserName()); System.out.println(request.getPassword()); List<People>peoples=new ArrayList<>(); for(int i=0;i<5;i++) { People people=new People("wpeace", 22+i, "男"); peoples.add(people); } return peoples; } }
3.实现服务启动和注册类
package cn.wpeace.thriftService; import java.io.IOException; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import cn.wpeace.thrift.PeopleService; import cn.wpeace.thrift.StudentService; import net.sf.json.JSONObject; public class ServiceSatrt implements Watcher{ //初始化log4j static{ BasicConfigurator.configure(); } private static final Log LOGGER=LogFactory.getLog(ServiceSatrt.class); private static final Integer[] PORTS={8081,8082}; public static final String serviceNames[]={"studentService","peopleService"}; private static final String SERVICE_IP="192.168.1.118"; private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接 private ZooKeeper zk ; /** * thrift服务启动标记 */ private Integer isThriftStart=0; /** * 启动所有服务 */ private void startServer(){ ServiceSatrt.LOGGER.info("启动Thrift线程"); // 创建启动线程: StartServerThread studenThread = new StartServerThread(PORTS[0], new StudentService.Processor<StudentService.Iface>(new StudentServiceImpl())); StartServerThread peopleThread = new StartServerThread(PORTS[1], new PeopleService.Processor<PeopleService.Iface>(new PeopleServiceImpl())); ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(studenThread); pool.submit(peopleThread); //关闭线程池:线程仍然在运行 pool.shutdown(); } private class StartServerThread implements Runnable{ private Integer port; private TProcessor processor; public StartServerThread(Integer port,TProcessor processor) { this.port=port; this.processor=processor; } @Override public void run() { ServiceSatrt.LOGGER.info("thrift服务正在准备启动"); try { // 非阻塞式 TNonblockingServerSocket serverSocket=new TNonblockingServerSocket(port); // 为服务器设置对应的IO网络模型 TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket); // 设置控制器 tArgs.processor(processor); // 设置消息封装格式 tArgs.protocolFactory(new TBinaryProtocol.Factory());//Thrift特有的一种二进制描述格式 // 启动Thrift服务 TNonblockingServer server = new TNonblockingServer(tArgs); server.setServerEventHandler(new StartServerEventHander()); server.serve();//启动后,程序就停在这里了。 } catch (TTransportException e) { e.printStackTrace(); } } } private class StartServerEventHander implements TServerEventHandler{ @Override public void preServe() { synchronized (isThriftStart) { isThriftStart++;//当全部服务启动成功才连接zk if(isThriftStart==2){ synchronized (ServiceSatrt.this) { ServiceSatrt.LOGGER.info("thrift服务启动完成"); ServiceSatrt.this.notify(); } } } } @Override public ServerContext createContext(TProtocol arg0, TProtocol arg1) { return null; } @Override public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { } @Override public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) { } } private void connectZk() throws KeeperException, InterruptedException, IOException{ // 连接到zk服务器集群,添加默认的watcher监听 zk= new ZooKeeper("192.168.1.127:2181", 120000, this); connectedSignal.await(); // 创建一个父级节点Service Stat pathStat = null; try { pathStat = zk.exists("/Service", false); // 如果条件成立,说明节点不存在(只需要判断一个节点的存在性即可) // 创建的这个节点是一个“永久状态”的节点 if (pathStat == null) { zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.exit(-1); } // 开始添加子级节点,每一个子级节点都表示一个这个服务提供者提供的业务服务 for (int i = 0; i < 2; i++) { JSONObject nodeData = new JSONObject(); nodeData.put("ip", SERVICE_IP); nodeData.put("port", PORTS[i]); zk.create("/Service/" + serviceNames[i], nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } // 执行到这里,说明所有的service都启动完成了 ServiceSatrt.LOGGER.info("===================所有service都启动完成了,主线程开始启动==================="); } @Override public void process(WatchedEvent event) { //建立连接用 if(event.getState()==KeeperState.SyncConnected){ connectedSignal.countDown(); return; } //暂在这里不做处理,正常情况下需要处理。 } public static void main(String[] args) { //启动服务 ServiceSatrt serviceSatrt=new ServiceSatrt(); serviceSatrt.startServer(); //等待服务启动完成 synchronized (serviceSatrt) { try { while (serviceSatrt.isThriftStart<2) { serviceSatrt.wait(); } } catch (Exception e) { ServiceSatrt.LOGGER.error(e); System.out.println(-1); } } //启动连接 try { serviceSatrt.connectZk(); } catch (Exception e) { ServiceSatrt.LOGGER.error(e); System.out.println(-1); } } }
4.编写客户端类:
package cn.wpeace.thriftClinet; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.sound.midi.VoiceStatus; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import cn.wpeace.thrift.People; import cn.wpeace.thrift.PeopleService; import cn.wpeace.thrift.Request; import cn.wpeace.thrift.Student; import cn.wpeace.thrift.StudentService; import cn.wpeace.thriftService.ServiceSatrt; import net.sf.json.JSONObject; public class ThriftClinet implements Watcher{ static{ BasicConfigurator.configure(); } private static final Log LOGGER=LogFactory.getLog(ThriftClinet.class); private String serverIp; private String serverPort; private String servername; private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接 private ZooKeeper zk; private void init(String servername) throws IOException, KeeperException, InterruptedException{ // 连接到zk服务器集群,添加默认的watcher监听 this.zk = new ZooKeeper("192.168.1.127:2181", 120000, this); connectedSignal.await(); this.servername=servername; updateServer(); ThriftClinet.LOGGER.info("初始化完成"); } /** * 从zk上获取Service中的节点数据:包括IP和端口 * @throws KeeperException * @throws InterruptedException */ private void updateServer() throws KeeperException, InterruptedException { this.serverIp=null; this.serverPort=null; /* * * 判断服务根节点是否存在 */ Stat pathStat = null; try { pathStat = this.zk.exists("/Service", false); // 如果条件成立,说明节点不存在 // 创建的这个节点是一个“永久状态”的节点 if (pathStat == null) { ThriftClinet.LOGGER.info("客户端创立Service"); this.zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return; } } catch (Exception e) { ThriftClinet.LOGGER.error(e); System.exit(-1); } // 获取服务列表 List<String> serviceList = this.zk.getChildren("/Service", false); if (serviceList == null || serviceList.isEmpty()) { ThriftClinet.LOGGER.info("未发现相关服务,客户端退出"); return; } // 查找所需的服务是否存在 boolean isFound = false; byte[] data;// 获取节点数据 for (String name : serviceList) { if (StringUtils.equals(name, this.servername)) { isFound = true; break;// 找到一个就退出 } } // 获得数据 if (isFound) { data = this.zk.getData("/Service/" + this.servername, false, null); } else { ThriftClinet.LOGGER.info("未发现相关服务,客户端退出"); return; } if (data == null || data.length == 0) { ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出"); return; } JSONObject fromObject = JSONObject.fromObject(new String(data)); this.serverIp = fromObject.getString("ip"); this.serverPort = fromObject.getString("port"); } @Override public void process(WatchedEvent event) { //建立连接用 if(event.getState()==KeeperState.SyncConnected){ connectedSignal.countDown(); return; } //如果发生 Service下的节点变换,就更新ip和端口 if (event.getType() == EventType.NodeChildrenChanged && "/Service".equals(event.getPath())) { try { updateServer(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ThriftClinet studentClinet=new ThriftClinet(); ThriftClinet peopleClinet=new ThriftClinet(); /** * studnetService 测试 */ try { studentClinet.init(ServiceSatrt.serviceNames[0]); if(studentClinet.serverIp==null||studentClinet.serverPort==null){ ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出"); } //如果是非阻塞型 需要使用 TTransport tSocket = new TFramedTransport(new TSocket(studentClinet.serverIp, Integer.parseInt(studentClinet.serverPort), 30000)); //设置封装协议 TBinaryProtocol protocol = new TBinaryProtocol(tSocket); //建立调用client StudentService.Client client=new StudentService.Client(protocol); //设置调用参数: Request request=new Request().setUserName("peace").setPassword("123456"); //准备传输 tSocket.open(); //正式调用接口 List<Student> allStudent = client.getAllStudent(request); //请求结束,断开连接 tSocket.close(); for(Student student:allStudent) { System.out.println(student.getNaem()+":"+student.getAge()); } } catch (Exception e) { ThriftClinet.LOGGER.info("出现异常,客户端退出"); } /** * PeopleService测试 */ try { peopleClinet.init(ServiceSatrt.serviceNames[1]); if(peopleClinet.serverIp==null||peopleClinet.serverPort==null){ ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出"); } //如果是非阻塞型 需要使用 TTransport tSocket = new TFramedTransport(new TSocket(peopleClinet.serverIp, Integer.parseInt(peopleClinet.serverPort), 30000)); //设置封装协议 TBinaryProtocol protocol = new TBinaryProtocol(tSocket); //建立调用client PeopleService.Client client=new PeopleService.Client(protocol); //设置调用参数: Request request=new Request().setUserName("peace").setPassword("123456"); //准备传输 tSocket.open(); //正式调用接口 List<People> allPeople = client.getAllPeople(request); //请求结束,断开连接 tSocket.close(); for(People people:allPeople) { System.out.println(people.getNaem()+":"+people.getAge()+"性别"+people.getSex()); } } catch (Exception e) { ThriftClinet.LOGGER.info("出现异常,客户端退出"); } } }
所有代码下载请见github,上面的链接。
3.3测试步骤:
1.启动ServiceSatrt类2.启动ThriftClinet类
3.测试结果:
服务端:
客户端:
本文来自伊豚(blog.wpeace.cn)
相关文章推荐
- 4.RPC框架的简单实现(服务发布-ServiceBean实现)
- RPC服务的发布订阅实现Thrift
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- 利用redis简单实现消息订阅和发布
- 【远程调用框架】如何实现一个简单的RPC框架(三)优化一:利用动态代理改变用户服务调用方式
- 6.RPC框架的简单实现(服务发布-rmi协议)
- 8.RPC框架的简单实现(服务发布-Ldubbo协议)
- redis利用pipline实现发布订阅机制
- 简单的订阅发布机制实现(Golang)
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- 使用thrift实现订阅服务和发布服务
- .NET消息发布和订阅机制的实现案例
- Service Broker实现发布-订阅(Publish-Subscribe)框架(1)
- 构建插件式的应用程序框架(八)-视图服务的简单实现
- 简单的WCF发布-订阅(Pub/Sub)服务(转)
- Service Broker实现发布-订阅(Publish-Subscribe)框架
- Java实现的简单的WebService服务发布和Client调用例子
- Thrift初探:简单实现C#通讯服务程序
- 构建插件式的应用程序框架(八)----视图服务的简单实现
- Service Broker实现发布-订阅(Publish-Subscribe)框架