您的位置:首页 > 其它

利用Thrift和zk简单实现服务治理框架中的订阅发布机制

2017-01-06 10:59 756 查看
本文简单介绍下利用Thrift和zk简单实现服务治理框架服务的订阅发布机制,类似于Dubbo的服务治理。这个只是简单版本,只供学习和理解用。

全部代码下载:Github链接:github链接,点击惊喜;写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github;多谢;

1.什么是服务治理:

1.1微服务简单介绍:

微服务已经成为当下最热门的话题之一。它是一种新的架构风格,涉及组织架构、设计、交付、运维等方面的变革,核心目标是为了解决系统的交付周期,并降低维护成本和研发成本。相比传统的SOA架构或者单块架构,微服务有很多的优势,比如技术的多样性、模块化、独立部署等,但也带来了相应的成本,比如运维成本、服务管理成本等。

1.2服务治理的出现

在微服务盛行下,利用RMI或Hessian等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用已经变得越来越不能满足需求。

1.服务越来越多时,服务URL配置管理变得非常困难。

2.服务间依赖关系变得错踪复杂

3.服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?

4…….等等

为了满足服务线下管控、保障线上高效运行,需要有一个统一的服务治理框架对服务进行统一、有效管控,保障服务的高效、健康运行。服务治理是分布式服务框架的一个可选特性,尽管从服务开发和运行角度看它不是必须的,但是如果没有服务治理功能,分布式服务框架的服务SLA很难得到保障,服务化也很难真正实施成功。

基于以上原因,需要对各个服务做治理,这也是就为什么有了dubbo这类服务治理框架,它与其他RPC框架相比(例如thrift,avro),不仅仅提供了透明的服务调用,而且还提供了服务治理,比如上述的调用统计管理、负载均衡,这样每个业务模块只需专注于自己的内部业务逻辑即可。

1.3服务治理的几个要素:

服务管理组件:这个组件是“服务治理”的核心组件,您的服务治理框架有多强大,主要取决于您的服务管理组件功能有多强大。它至少具有的功能包括:服务注册管理、访问路由;另外,它还可以具有:服务版本管理、服务优先级管理、访问权限管理、请求数量限制、连通性管理、注册服务集群、节点容错、事件订阅-发布、状态监控,等等功能。

服务提供者(服务生产者):即服务的具体实现,然后按照服务治理框架特定的规范发布到服务管理组件中。这意味着什么呢?这意味着,服务提供者不一定按照RPC调用的方式发布服务,而是按照整个服务治理框架所规定的方式进行发布(如果服务治理框架要求服务提供者以RPC调用的形式进行发布,那么服务提供者就必须以RPC调用的形式进行发布;如果服务治理框架要求服务提供者以Http接口的形式进行发布,那么服务提供者就必须以Http接口的形式进行发布,但后者这种情况一般不会出现)。

服务使用者(服务消费者):即调用这个服务的用户,调用者首先到服务管理组件中查询具体的服务所在的位置;服务管理组件收到查询请求后,将向它返回具体的服务所在位置(视服务管理组件功能的不同,还有可能进行这些计算:判断服务调用者是否有权限进行调用、是否需要生成认证标记、是否需要重新检查服务提供者的状态、让调用者使用哪一个服务版本等等)。服务调用者在收到具体的服务位置后,向服务提供者发起正式请求,并且返回相应的结果。第二次调用时,服务请求者就可以像服务提供者直接发起调用请求了(当然,您可以有一个服务提供期限的设置,使用租约协议就可以很好的实现)。

参考于: http://blog.csdn.net/yinwenjie/article/details/49869535


简单画了如下图:



1.4服务的订阅发布机制

它的核心理念是实现服务消费者和服务提供者的解耦,让服务消费者能够像使用本地接口一样消费远端的服务提供者,而不需要关心服务提供者的位置信息,实现透明化调用。常用的服务注册中心有Zookeeper、ETCD,以及基于数据库的配置中心。

2.设计一个服务治理框架中的订阅发布机制

2.1使用的技术:

1.Zookeeper作为注册中心并进行管控。具体介绍请参考我的博客: Hadoop集群之 ZooKeeper和Hbase环境搭建

2.Thrift提供RPC调用功能。具体介绍请参考我的博客:Apache Thrift入门学习

2.2设计思路

1.利用Zookeeper建立/Service根目录,在该目录下建立相应的服务接口子目录存放该接口的IP地址和端口号—注册服务

2.利用Thrift创建服务和启动服务

3.利用Zookeeper去对应目录/Service订阅相应服务获得接口的IP地址和端口号,并注册监听事件,当目录改变时更新接口的IP地址和端口号—订阅服务

3.实现订阅发布机制

3.1实现步骤:

1.编写Thrift的IDL并编译出相应的接口类。

2.实现相应的接口。

3.编写服务启动和注册服务类。

4.编写相应的客户端订阅服务。

3.2代码实现

工程为maven工程,假如不建立maven工程,请下载对应的lib包。

具体实现原理,见注释:

1.IDL文件和编译:

//命名空间定义:java包
namespace java cn.wpeace.thrift
//结构体定义:转化java中的实体类
struct Request{
1:required string userName;
2:required string password;
}
//定义返回类型
struct Student{
1:required string naem;
2:required i32 age;
}
struct People{
1:required string naem;
2:required i32 age;
3:required string sex;
}
//异常描述定义
exception HelloException{
1:required string msg;
}
//服务定义,生成接口用
service StudentService{
list<Student> getAllStudent(1:Request request)throws (1:HelloException e);
}
//服务定义,生成接口用
service PeopleService{
list<People> getAllPeople(1:Request request)throws (1:HelloException e);
}
//thrift -gen java ./zk.thrift


2.实现相应接口:

public class StudentServiceImpl implements Iface {// 实现的是StudentService类下面的接口
@Override
public List<Student> getAllStudent(Request request) throws HelloException, TException {
System.out.println("调用studentService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<Student> students = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Student student = new Student();
student.setNaem("peace" + i);
student.setAge(22 + i);
students.add(student);
}
return students;
}
}
public class PeopleServiceImpl implements Iface{
@Override
public List<People> getAllPeople(Request request) throws HelloException, TException {
System.out.println("调用PeopleService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<People>peoples=new ArrayList<>();
for(int i=0;i<5;i++)
{
People people=new People("wpeace", 22+i, "男");
peoples.add(people);
}
return peoples;
}
}


3.实现服务启动和注册类

package cn.wpeace.thriftService;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.StudentService;
import net.sf.json.JSONObject;
public class ServiceSatrt implements Watcher{
//初始化log4j
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ServiceSatrt.class);
private static final Integer[] PORTS={8081,8082};
public static final String serviceNames[]={"studentService","peopleService"};
private static final String SERVICE_IP="192.168.1.118";
private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接
private ZooKeeper zk ;
/**
* thrift服务启动标记
*/
private Integer isThriftStart=0;

/**
* 启动所有服务
*/
private void startServer(){
ServiceSatrt.LOGGER.info("启动Thrift线程");
// 创建启动线程:
StartServerThread studenThread = new StartServerThread(PORTS[0],
new StudentService.Processor<StudentService.Iface>(new StudentServiceImpl()));
StartServerThread peopleThread = new StartServerThread(PORTS[1],
new PeopleService.Processor<PeopleService.Iface>(new PeopleServiceImpl()));
ExecutorService pool = Executors.newFixedThreadPool(2);

pool.submit(studenThread);
pool.submit(peopleThread);
//关闭线程池:线程仍然在运行
pool.shutdown();
}
private class StartServerThread implements Runnable{
private Integer port;
private TProcessor processor;
public StartServerThread(Integer port,TProcessor processor) {
this.port=port;
this.processor=processor;
}
@Override
public void run() {
ServiceSatrt.LOGGER.info("thrift服务正在准备启动");
try {
// 非阻塞式
TNonblockingServerSocket serverSocket=new TNonblockingServerSocket(port);
// 为服务器设置对应的IO网络模型
TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
// 设置控制器
tArgs.processor(processor);
// 设置消息封装格式
tArgs.protocolFactory(new TBinaryProtocol.Factory());//Thrift特有的一种二进制描述格式
// 启动Thrift服务
TNonblockingServer server = new TNonblockingServer(tArgs);
server.setServerEventHandler(new StartServerEventHander());
server.serve();//启动后,程序就停在这里了。
} catch (TTransportException e) {
e.printStackTrace();
}

}

}
private class StartServerEventHander implements TServerEventHandler{

@Override
public void preServe() {
synchronized (isThriftStart) {
isThriftStart++;//当全部服务启动成功才连接zk
if(isThriftStart==2){
synchronized (ServiceSatrt.this) {
ServiceSatrt.LOGGER.info("thrift服务启动完成");
ServiceSatrt.this.notify();
}
}
}
}
@Override
public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
return null;
}
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
}
@Override
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
}
}
private void connectZk() throws KeeperException, InterruptedException, IOException{
// 连接到zk服务器集群,添加默认的watcher监听
zk= new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
// 创建一个父级节点Service
Stat pathStat = null;
try {
pathStat = zk.exists("/Service", false);
// 如果条件成立,说明节点不存在(只需要判断一个节点的存在性即可)
// 创建的这个节点是一个“永久状态”的节点
if (pathStat == null) {
zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
System.exit(-1);
}
// 开始添加子级节点,每一个子级节点都表示一个这个服务提供者提供的业务服务
for (int i = 0; i < 2; i++) {
JSONObject nodeData = new JSONObject();
nodeData.put("ip", SERVICE_IP);
nodeData.put("port", PORTS[i]);
zk.create("/Service/" + serviceNames[i], nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
// 执行到这里,说明所有的service都启动完成了
ServiceSatrt.LOGGER.info("===================所有service都启动完成了,主线程开始启动===================");
}
@Override
public void process(WatchedEvent event) {
//建立连接用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//暂在这里不做处理,正常情况下需要处理。

}
public static void main(String[] args) {
//启动服务
ServiceSatrt serviceSatrt=new ServiceSatrt();
serviceSatrt.startServer();
//等待服务启动完成
synchronized (serviceSatrt) {
try {
while (serviceSatrt.isThriftStart<2) {
serviceSatrt.wait();
}
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
//启动连接
try {
serviceSatrt.connectZk();
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
}


4.编写客户端类:

package cn.wpeace.thriftClinet;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.sound.midi.VoiceStatus;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import cn.wpeace.thrift.People;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.Request;
import cn.wpeace.thrift.Student;
import cn.wpeace.thrift.StudentService;
import cn.wpeace.thriftService.ServiceSatrt;
import net.sf.json.JSONObject;

public class ThriftClinet implements Watcher{
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ThriftClinet.class);
private String serverIp;
private String serverPort;
private String servername;
private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接
private ZooKeeper zk;
private  void init(String servername) throws IOException, KeeperException, InterruptedException{
// 连接到zk服务器集群,添加默认的watcher监听
this.zk = new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
this.servername=servername;
updateServer();
ThriftClinet.LOGGER.info("初始化完成");
}
/**
* 从zk上获取Service中的节点数据:包括IP和端口
* @throws KeeperException
* @throws InterruptedException
*/
private void updateServer() throws KeeperException, InterruptedException {
this.serverIp=null;
this.serverPort=null;
/*
*
* 判断服务根节点是否存在
*/
Stat pathStat = null;
try {
pathStat = this.zk.exists("/Service", false);
// 如果条件成立,说明节点不存在
// 创建的这个节点是一个“永久状态”的节点
if (pathStat == null) {
ThriftClinet.LOGGER.info("客户端创立Service");
this.zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return;
}
} catch (Exception e) {
ThriftClinet.LOGGER.error(e);
System.exit(-1);
}
// 获取服务列表
List<String> serviceList = this.zk.getChildren("/Service", false);
if (serviceList == null || serviceList.isEmpty()) {
ThriftClinet.LOGGER.info("未发现相关服务,客户端退出");
return;
}
// 查找所需的服务是否存在
boolean isFound = false;
byte[] data;// 获取节点数据
for (String name : serviceList) {
if (StringUtils.equals(name, this.servername)) {
isFound = true;
break;// 找到一个就退出
}
}
// 获得数据
if (isFound) {
data = this.zk.getData("/Service/" + this.servername, false, null);
} else {
ThriftClinet.LOGGER.info("未发现相关服务,客户端退出");
return;
}
if (data == null || data.length == 0) {
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
return;
}
JSONObject fromObject = JSONObject.fromObject(new String(data));
this.serverIp = fromObject.getString("ip");
this.serverPort = fromObject.getString("port");
}

@Override
public void process(WatchedEvent event) {
//建立连接用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//如果发生 Service下的节点变换,就更新ip和端口
if (event.getType() == EventType.NodeChildrenChanged
&& "/Service".equals(event.getPath())) {
try {
updateServer();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThriftClinet studentClinet=new ThriftClinet();
ThriftClinet peopleClinet=new ThriftClinet();
/**
* studnetService 测试
*/
try {
studentClinet.init(ServiceSatrt.serviceNames[0]);
if(studentClinet.serverIp==null||studentClinet.serverPort==null){
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
}
//如果是非阻塞型  需要使用
TTransport tSocket = new TFramedTransport(new TSocket(studentClinet.serverIp,
Integer.parseInt(studentClinet.serverPort),  30000));
//设置封装协议
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立调用client
StudentService.Client client=new StudentService.Client(protocol);
//设置调用参数:
Request request=new Request().setUserName("peace").setPassword("123456");
//准备传输
tSocket.open();
//正式调用接口
List<Student> allStudent = client.getAllStudent(request);
//请求结束,断开连接
tSocket.close();
for(Student student:allStudent)
{
System.out.println(student.getNaem()+":"+student.getAge());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出现异常,客户端退出");
}

/**
* PeopleService测试
*/
try {
peopleClinet.init(ServiceSatrt.serviceNames[1]);
if(peopleClinet.serverIp==null||peopleClinet.serverPort==null){
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
}
//如果是非阻塞型  需要使用
TTransport tSocket = new TFramedTransport(new TSocket(peopleClinet.serverIp,
Integer.parseInt(peopleClinet.serverPort),  30000));
//设置封装协议
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立调用client
PeopleService.Client client=new PeopleService.Client(protocol);
//设置调用参数:
Request request=new Request().setUserName("peace").setPassword("123456");
//准备传输
tSocket.open();
//正式调用接口
List<People> allPeople = client.getAllPeople(request);
//请求结束,断开连接
tSocket.close();
for(People people:allPeople)
{
System.out.println(people.getNaem()+":"+people.getAge()+"性别"+people.getSex());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出现异常,客户端退出");
}
}
}


所有代码下载请见github,上面的链接。

3.3测试步骤:

1.启动ServiceSatrt类

2.启动ThriftClinet类

3.测试结果:

服务端:





客户端:





本文来自伊豚(blog.wpeace.cn)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息