您的位置:首页 > 其它

微服务 Dubbo + Zookeeper 原理解析

2017-09-25 17:59 441 查看
说明 :内容为小编个人见解,同时做备忘用

基础准备 : java Socket , serverSocket , RPC 协议。

(1) 网络通信数据传输靠的就是 IO 流(byte[] 字节) 。

(2) RPC 协议是指 : 利用tcp 通信,对byte[] 加上自己的规则, 服务端和客户端可以通过此规则进行数据的封装和解析。

dubbo + zookeeper 代码演练

服务提供者 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>dubbo-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>


provider.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="hello-world-app"  />

<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />

<!-- 用dubbo协议在20880端口暴露服务 -->
<!-- serialization 协议序列化方式,当协议支持多种序列化方式时使用,,默认hessian2
比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json等
-->
<dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/>

<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" />

<!-- 和本地bean一样实现服务 -->
<bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" />

</beans>


服务接口 :

package com.tony.test.dubbo;

public interface DemoService {

String sayHello(User user, String mark);
}


子类 :

package com.tony.test.dubbo.provider;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class DemoServiceImpl implements DemoService {

public String sayHello(User user, String mark) {

System.out.println("进来了----");

return "agui>" + mark + " name>" + user.getName();
}
}


main 入口 :

或者直接启动tomcat

package com.tony.test.dubbo.provider;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProviderMain {
// 服务提供者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "provider.xml" });
context.start();

System.in.read();
}
}


服务消费者

pom.xml 同上 , 接口(interface)同上 。

consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="consumer-of-helloworld-app"  />

<!-- 使用multicast广播注册中心暴露发现服务地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />

<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/>

</beans>


main 入口

或者启动tomcat

package com.tony.test.dubbo.consumer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class ConsumerMain {

// 服务消费者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" });
context.start();
DemoService demoService = (DemoService) context.getBean("demoService");
User user = new User();
user.setName("agui");
String hello = demoService.sayHello(user, "2");

System.out.println("RPC调用结果:" + hello);
}

}


以上就是 dubbo + zookeeper 简单demo 实现。

接下来讲解一下实现原理

之前提到过:带有规则的byte[] 字节 ,那么阿里的dubbo 对bute[] 规则 如图 :





基本结构分为:header , body 两部分,详细内容参考上图。

整体流程 :

1) 启动配置文件,spring bean 容器加载我们的接口实现类,服务提供者封装serverSocket , 并通过 zookeeper客户端在 zookeeper中创建节点, 节点中记录服务端的ip, 端口,暴露的接口等信息。

2) 客户端通过zookeeper客户端拿到服务端ip , 端口等信息。

3)构建header 和 body 的字节数组(byte) , 通过socket 拿到输出流,write() 方法输出字节数组。

4)服务端拿到输入流之后,会得到接口信息,调用的方法名称等,可以通过spring的bean 工厂类,拿到 注入到bean 中的 该接口子类对象,然后去掉用具体的方法,拿到返回值后可以进行序列化,在用 socket 和 serverSocket 建立的管道 输出流 输出 子类方法返回的值,此时 http 请求,响应完毕。

说明:http 通信字节流,转为字符流,反序列化可以用阿里的

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>


模拟消费端源码 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>tony-dubbo-client</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
</dependencies>
</project>


byte[]原数据封装类 :

package com.agui.test.consumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;

import com.alibaba.fastjson.JSON;

/**
* 发起dubbo调用请求
*/
public class AguiDubboRpcRequest {

String host; // 服务提供者IP
int port; // 服务提供者端口

// 协议中header段
ByteBuffer header = ByteBuffer.allocate(16);

// 协议中body段
StringBuffer rpcBody = new StringBuffer();

public TonyDubboRpcRequest(String host, int port) {
this.host = host;
this.port = port;

// 魔数 da bb
header.put((byte) 0xda);
header.put((byte) 0xbb);

// 标识 固定为 C6
header.put((byte) 0xC6);

// 响应状态 ,这里没有
header.put((byte) 0x00);

// 消息ID 编号传进来

// 数据长度 这个要在最后计算

}

/** 需要是完整的类名 */
public TonyDubboRpcRequest dubboVersion(String dubboVersion) {
rpcBody.append(JSON.toJSONString(dubboVersion));
rpcBody.append("\r\n");
return this;
}

/** 需要是完整的类名 */
public TonyDubboRpcRequest path(String path) {
rpcBody.append(JSON.toJSONString(path));
rpcBody.append("\r\n");
return this;
}

/** 服务版本号 */
public TonyDubboRpcRequest serviceVersion(String serviceVersion) {
rpcBody.append(JSON.toJSONString(serviceVersion));
rpcBody.append("\r\n");
return this;
}

/** 方法名 */
public TonyDubboRpcRequest method(String method) {
rpcBody.append(JSON.toJSONString(method));
rpcBody.append("\r\n");
return this;
}

/** 参数类型 */
public TonyDubboRpcRequest desc(String desc) {
rpcBody.append(JSON.toJSONString(desc));
rpcBody.append("\r\n");
return this;
}

/** 参数值 */
public TonyDubboRpcRequest values(Object... values) {
for (Object value : values) {
rpcBody.append(JSON.toJSONString(value));
rpcBody.append("\r\n");
}
return this;
}

/** 隐式传参 */
public TonyDubboRpcRequest attachments(Object attachment) {

rpcBody.append(JSON.toJSONString(attachment));
rpcBody.append("\r\n");
return this;
}

/** 补齐header */
public TonyDubboRpcRequest build(long msgId) {
// 消息ID
byte[] msgIdBytes = new byte[8];
long2bytes(msgId, msgIdBytes, 0);
this.header.put(msgIdBytes);

// 计算body长度
int length = this.rpcBody.toString().getBytes().length;
byte[] lenBytes = new byte[4];
int2bytes(length, lenBytes, 0);
this.header.put(lenBytes);

return this;
}

/** int转4字节数组 */
private void int2bytes(int v, byte[] b, int off) {
b[off + 3] = (byte) v;
b[off + 2] = (byte) (v >>> 8);
b[off + 1] = (byte) (v >>> 16);
b[off + 0] = (byte) (v >>> 24);
}

/** long转8字节数组 */
private void long2bytes(long v, byte[] b, int off) {
b[off + 7] = (byte) v;
b[off + 6] = (byte) (v >>> 8);
b[off + 5] = (byte) (v >>> 16);
b[off + 4] = (byte) (v >>> 24);
b[off + 3] = (byte) (v >>> 32);
b[off + 2] = (byte) (v >>> 40);
b[off + 1] = (byte) (v >>> 48);
b[off + 0] = (byte) (v >>> 56);
}

public String run() throws Exception {
System.out.println("###########输出rcpBody内容:##########");
System.out.println(this.rpcBody.toString());
System.out.println("###########结束输出###########");

Socket socket = new Socket(this.host, this.port);
try {
OutputStream rpcOPS = socket.getOutputStream();
// 发起调用请求
rpcOPS.write(header.array());
rpcOPS.write(this.rpcBody.toString().getBytes());

// 输出RPC调用结果
InputStream rpcRsp = socket.getInputStream();
byte[] header = new byte[16];
rpcRsp.read(header);

byte[] resp = new byte[1024];
rpcRsp.read(resp);

// 忽略header,取body
return new String(resp);

} catch (Exception e) {
e.printStackTrace();
} finally {
socket.close();
}
return null;
}
}


程序入口 :

package com.agui.test.consumer;

import com.alibaba.fastjson.JSONObject;

public class AguiConsumerMain {

// 手写dubbo客户端
public static void main(String[] args) throws Exception {
// TODO 配置注册中心
// TODO 获取服务信息
// 本实例重点在于分析dubbo rpc协议内容,故此消费者假定已经知晓服务端的信息。

JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}");

TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880);
tonyDubboRpcRequest
.dubboVersion("2.5.3")
.path("com.tony.test.dubbo.DemoService")
.serviceVersion("0.0.0.0")
.method("sayHello")
.desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;")
.values(user, "测试tony_1")
.attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}"))
.build(3);

String result = tonyDubboRpcRequest.run();

System.out.println("RPC调用结果:");
System.out.println(result);
}

}


运行程序的时候需要启动上面的:服务提供者,zookeeper 。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: