微服务 Dubbo + Zookeeper 原理解析
2017-09-25 17:59
441 查看
说明 :内容为小编个人见解,同时做备忘用
基础准备 : java Socket , serverSocket , RPC 协议。
(1) 网络通信数据传输靠的就是 IO 流(byte[] 字节) 。
(2) RPC 协议是指 : 利用tcp 通信,对byte[] 加上自己的规则, 服务端和客户端可以通过此规则进行数据的封装和解析。
dubbo + zookeeper 代码演练
服务提供者 :
pom.xml
provider.xml
服务接口 :
子类 :
main 入口 :
或者直接启动tomcat
服务消费者
pom.xml 同上 , 接口(interface)同上 。
consumer.xml
main 入口
或者启动tomcat
以上就是 dubbo + zookeeper 简单demo 实现。
接下来讲解一下实现原理
之前提到过:带有规则的byte[] 字节 ,那么阿里的dubbo 对bute[] 规则 如图 :
基本结构分为:header , body 两部分,详细内容参考上图。
整体流程 :
1) 启动配置文件,spring bean 容器加载我们的接口实现类,服务提供者封装serverSocket , 并通过 zookeeper客户端在 zookeeper中创建节点, 节点中记录服务端的ip, 端口,暴露的接口等信息。
2) 客户端通过zookeeper客户端拿到服务端ip , 端口等信息。
3)构建header 和 body 的字节数组(byte) , 通过socket 拿到输出流,write() 方法输出字节数组。
4)服务端拿到输入流之后,会得到接口信息,调用的方法名称等,可以通过spring的bean 工厂类,拿到 注入到bean 中的 该接口子类对象,然后去掉用具体的方法,拿到返回值后可以进行序列化,在用 socket 和 serverSocket 建立的管道 输出流 输出 子类方法返回的值,此时 http 请求,响应完毕。
说明:http 通信字节流,转为字符流,反序列化可以用阿里的
模拟消费端源码 :
pom.xml
byte[]原数据封装类 :
程序入口 :
运行程序的时候需要启动上面的:服务提供者,zookeeper 。
基础准备 : java Socket , serverSocket , RPC 协议。
(1) 网络通信数据传输靠的就是 IO 流(byte[] 字节) 。
(2) RPC 协议是指 : 利用tcp 通信,对byte[] 加上自己的规则, 服务端和客户端可以通过此规则进行数据的封装和解析。
dubbo + zookeeper 代码演练
服务提供者 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>tony.test</groupId> <artifactId>dubbo-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <version>2.5.3</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.8</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency> </dependencies> </project>
provider.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> <!-- 提供方应用信息,用于计算依赖关系 --> <dubbo:application name="hello-world-app" /> <!-- 使用zookeeper注册中心暴露服务地址 --> <dubbo:registry address="zookeeper://localhost:2181" /> <!-- 用dubbo协议在20880端口暴露服务 --> <!-- serialization 协议序列化方式,当协议支持多种序列化方式时使用,,默认hessian2 比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json等 --> <dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/> <!-- 声明需要暴露的服务接口 --> <dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" /> <!-- 和本地bean一样实现服务 --> <bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" /> </beans>
服务接口 :
package com.tony.test.dubbo; public interface DemoService { String sayHello(User user, String mark); }
子类 :
package com.tony.test.dubbo.provider; import com.tony.test.dubbo.DemoService; import com.tony.test.dubbo.User; public class DemoServiceImpl implements DemoService { public String sayHello(User user, String mark) { System.out.println("进来了----"); return "agui>" + mark + " name>" + user.getName(); } }
main 入口 :
或者直接启动tomcat
package com.tony.test.dubbo.provider; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ProviderMain { // 服务提供者 public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "provider.xml" }); context.start(); System.in.read(); } }
服务消费者
pom.xml 同上 , 接口(interface)同上 。
consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 --> <dubbo:application name="consumer-of-helloworld-app" /> <!-- 使用multicast广播注册中心暴露发现服务地址 --> <dubbo:registry address="zookeeper://localhost:2181" /> <!-- 生成远程服务代理,可以和本地bean一样使用demoService --> <dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/> </beans>
main 入口
或者启动tomcat
package com.tony.test.dubbo.consumer; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.tony.test.dubbo.DemoService; import com.tony.test.dubbo.User; public class ConsumerMain { // 服务消费者 public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" }); context.start(); DemoService demoService = (DemoService) context.getBean("demoService"); User user = new User(); user.setName("agui"); String hello = demoService.sayHello(user, "2"); System.out.println("RPC调用结果:" + hello); } }
以上就是 dubbo + zookeeper 简单demo 实现。
接下来讲解一下实现原理
之前提到过:带有规则的byte[] 字节 ,那么阿里的dubbo 对bute[] 规则 如图 :
基本结构分为:header , body 两部分,详细内容参考上图。
整体流程 :
1) 启动配置文件,spring bean 容器加载我们的接口实现类,服务提供者封装serverSocket , 并通过 zookeeper客户端在 zookeeper中创建节点, 节点中记录服务端的ip, 端口,暴露的接口等信息。
2) 客户端通过zookeeper客户端拿到服务端ip , 端口等信息。
3)构建header 和 body 的字节数组(byte) , 通过socket 拿到输出流,write() 方法输出字节数组。
4)服务端拿到输入流之后,会得到接口信息,调用的方法名称等,可以通过spring的bean 工厂类,拿到 注入到bean 中的 该接口子类对象,然后去掉用具体的方法,拿到返回值后可以进行序列化,在用 socket 和 serverSocket 建立的管道 输出流 输出 子类方法返回的值,此时 http 请求,响应完毕。
说明:http 通信字节流,转为字符流,反序列化可以用阿里的
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
模拟消费端源码 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>tony-dubbo-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
</dependencies>
</project>
byte[]原数据封装类 :
package com.agui.test.consumer; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import com.alibaba.fastjson.JSON; /** * 发起dubbo调用请求 */ public class AguiDubboRpcRequest { String host; // 服务提供者IP int port; // 服务提供者端口 // 协议中header段 ByteBuffer header = ByteBuffer.allocate(16); // 协议中body段 StringBuffer rpcBody = new StringBuffer(); public TonyDubboRpcRequest(String host, int port) { this.host = host; this.port = port; // 魔数 da bb header.put((byte) 0xda); header.put((byte) 0xbb); // 标识 固定为 C6 header.put((byte) 0xC6); // 响应状态 ,这里没有 header.put((byte) 0x00); // 消息ID 编号传进来 // 数据长度 这个要在最后计算 } /** 需要是完整的类名 */ public TonyDubboRpcRequest dubboVersion(String dubboVersion) { rpcBody.append(JSON.toJSONString(dubboVersion)); rpcBody.append("\r\n"); return this; } /** 需要是完整的类名 */ public TonyDubboRpcRequest path(String path) { rpcBody.append(JSON.toJSONString(path)); rpcBody.append("\r\n"); return this; } /** 服务版本号 */ public TonyDubboRpcRequest serviceVersion(String serviceVersion) { rpcBody.append(JSON.toJSONString(serviceVersion)); rpcBody.append("\r\n"); return this; } /** 方法名 */ public TonyDubboRpcRequest method(String method) { rpcBody.append(JSON.toJSONString(method)); rpcBody.append("\r\n"); return this; } /** 参数类型 */ public TonyDubboRpcRequest desc(String desc) { rpcBody.append(JSON.toJSONString(desc)); rpcBody.append("\r\n"); return this; } /** 参数值 */ public TonyDubboRpcRequest values(Object... values) { for (Object value : values) { rpcBody.append(JSON.toJSONString(value)); rpcBody.append("\r\n"); } return this; } /** 隐式传参 */ public TonyDubboRpcRequest attachments(Object attachment) { rpcBody.append(JSON.toJSONString(attachment)); rpcBody.append("\r\n"); return this; } /** 补齐header */ public TonyDubboRpcRequest build(long msgId) { // 消息ID byte[] msgIdBytes = new byte[8]; long2bytes(msgId, msgIdBytes, 0); this.header.put(msgIdBytes); // 计算body长度 int length = this.rpcBody.toString().getBytes().length; byte[] lenBytes = new byte[4]; int2bytes(length, lenBytes, 0); this.header.put(lenBytes); return this; } /** int转4字节数组 */ private void int2bytes(int v, byte[] b, int off) { b[off + 3] = (byte) v; b[off + 2] = (byte) (v >>> 8); b[off + 1] = (byte) (v >>> 16); b[off + 0] = (byte) (v >>> 24); } /** long转8字节数组 */ private void long2bytes(long v, byte[] b, int off) { b[off + 7] = (byte) v; b[off + 6] = (byte) (v >>> 8); b[off + 5] = (byte) (v >>> 16); b[off + 4] = (byte) (v >>> 24); b[off + 3] = (byte) (v >>> 32); b[off + 2] = (byte) (v >>> 40); b[off + 1] = (byte) (v >>> 48); b[off + 0] = (byte) (v >>> 56); } public String run() throws Exception { System.out.println("###########输出rcpBody内容:##########"); System.out.println(this.rpcBody.toString()); System.out.println("###########结束输出###########"); Socket socket = new Socket(this.host, this.port); try { OutputStream rpcOPS = socket.getOutputStream(); // 发起调用请求 rpcOPS.write(header.array()); rpcOPS.write(this.rpcBody.toString().getBytes()); // 输出RPC调用结果 InputStream rpcRsp = socket.getInputStream(); byte[] header = new byte[16]; rpcRsp.read(header); byte[] resp = new byte[1024]; rpcRsp.read(resp); // 忽略header,取body return new String(resp); } catch (Exception e) { e.printStackTrace(); } finally { socket.close(); } return null; } }
程序入口 :
package com.agui.test.consumer; import com.alibaba.fastjson.JSONObject; public class AguiConsumerMain { // 手写dubbo客户端 public static void main(String[] args) throws Exception { // TODO 配置注册中心 // TODO 获取服务信息 // 本实例重点在于分析dubbo rpc协议内容,故此消费者假定已经知晓服务端的信息。 JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}"); TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880); tonyDubboRpcRequest .dubboVersion("2.5.3") .path("com.tony.test.dubbo.DemoService") .serviceVersion("0.0.0.0") .method("sayHello") .desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;") .values(user, "测试tony_1") .attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}")) .build(3); String result = tonyDubboRpcRequest.run(); System.out.println("RPC调用结果:"); System.out.println(result); } }
运行程序的时候需要启动上面的:服务提供者,zookeeper 。
相关文章推荐
- 分布式服务框架dubbo原理解析(转)
- 19. Dubbo原理解析-通信层之暴露服务
- 20. Dubbo原理解析-通信层之引用服务
- 分布式服务框架dubbo原理解析(顶)
- 分布式服务框架dubbo原理解析 转
- 【DUBBO】Dubbo原理解析-服务发布
- Zookeeper的服务发现与注册原理解析
- 分布式服务框架dubbo原理解析
- 8. Dubbo原理解析-服务发布
- 分布式服务框架dubbo原理解析 转
- 15. Dubbo原理解析-集群&容错之目录服务Directory
- 分布式服务框架dubbo原理解析 转
- 9. Dubbo原理解析-服务引用
- 分布式服务框架dubbo原理解析
- 16. Dubbo原理解析-集群&容错之router路由服务
- 分布式服务框架dubbo原理解析
- 分布式服务框架dubbo原理解析
- dubbo源码解析(十) dubbo服务引用原理
- 13. Dubbo原理解析-注册中心之Zookeeper协议注册中心