Gaea源码阅读(二):客户端流程
2018-03-05 15:57
781 查看
转载地址:http://blog.csdn.net/m_vptr/article/details/9147279
以GaeaClientTest为入口
[java] view
plain copy
GaeaInit.init("conf/gaea.config");
/**
* 调用URL 格式:tcp://服务名//接口实现类
* 备注:
* 服务名:需要与gaea.config中的服务名一一对应
* 接口实现类:具体调用接口的接口实现类
*/
final String url = "tcp://demo/NewsService";
INewsService newsService = ProxyFactory.create(INewsService.class, url);
List<News> list = newsService.getNewsByCateID();
for (News news : list) {
System.out.println("ID is " + news.getNewsID() + " title is "
+ news.getTitle());
}
如注释所言,客户端通过Restfull格式url"tcp://serviceName/lookup "访问服务。
关键的一句:INewsService newsService = ProxyFactory.create(INewsService.class,url);
,这句话创建了一个实现了INewsService接口的代理,返回的这个代理对象将作为getNewsByCateID的调用句柄。
让我们看看代理的内部如何实现的,跟踪ProxyFactory.create
[java] view
plain copy
InvocationHandler handler = new ProxyStandard(type,serviceName, lookup);
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
newClass[]{type},
handler);
//代理的作用就是将所有调用转交到Handler
InvocationHandler接口实现类ProxyStandard
[java] view
plain copy
public ProxyStandard(Class<?> interfaceClass, String serviceName, String lookup){
this.interfaceClass =interfaceClass;
this.methodCaller = newMethodCaller(serviceName, lookup);
InvocationHandler主要由methodCaller完成功能,在invoke()中交给methodCaller
Object obj= methodCaller.doMethodCall(args, method);
MethodCaller机制 //doMethodCall
[java] view
plain copy
//准备参数
Type[]typeAry = methodInfo.getGenericParameterTypes();
Class<?>[] clsAry = methodInfo.getParameterTypes();
if (args== null) {
args = newObject[0];
}
Parameter[]paras = new Parameter[args.length];
List<Integer> outParas = newArrayList<Integer>();
if(typeAry != null) {
for (int i = 0;i < typeAry.length; i++) {
if(args[i] instanceof Out) {
paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.Out);
outParas.add(i);
} else {
paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.In);
}
}
}
//规范化方法名
StringmethodName = methodInfo.getName();
OperationContract ann =methodInfo.getAnnotation(OperationContract.class);
if (ann !=null) {
if(!ann.methodName().equals(AnnotationUtil.DEFAULT_VALUE)) {
methodName = "$" + ann.methodName();
}
}
ParameterreturnPara = new Parameter(null,methodInfo.getReturnType(), methodInfo.getGenericReturnType());
//调用方法
ServiceProxy proxy = ServiceProxy.getProxy(serviceName);
InvokeResultresult = proxy.invoke(returnPara, lookup,methodName, paras);
//设置返回值
if (result!= null && result.getOutPara() != null) {
for (int i = 0;i < outParas.size() && i < result.getOutPara().length; i++) {
Object op = args[outParas.get(i)];
if(op instanceof Out){
((Out)op).setOutPara(result.getOutPara()[i]);
}
}
}
returnresult.getResult();
这里出现了OperationContract注解,这是在方法上面注解的。注解有一个methodName属性。
关键点是ServiceProxy的getProxy和invoke方法。
ServiceProxy.getProxy:构造ServiceProxy
[java] view
plain copy
config =ServiceConfig.GetConfig(serviceName);
dispatcher = newDispatcher(config);
requestTime = config.getSocketPool().getReconnectTime();
intserverCount = 1;
if(dispatcher.GetAllServer()!= null && dispatcher.GetAllServer().size()> 0){
serverCount = dispatcher.GetAllServer().size();
}
ioreconnect =serverCount - 1;
// count = max {ioreconnect,requestTime}
count = requestTime;
if(ioreconnect > requestTime){
count = ioreconnect;
}
首先读取配置文件中属性为//Service[@name= serviceName]的节点
创建实现负载均衡的dispatcher,在这里创建了Server
[java] view
plain copy
for(ServerProfile ser : config.getServers()) {
if(ser.getWeithtRate() > 0) {
Server s = newServer(ser);
if(s.getState() != ServerState.Disable) {
ScoketPool sp = newScoketPool(s, config);
s.setScoketpool(sp);
ServerPool.add(s);
}
}
}
ServiceProxy.Invoke方法
[java] view
plain copy
//构造RequestProtocol
RequestProtocol requestProtocol = newRequestProtocol(typeName, methodName, listPara);
ProtocolsendP = new Protocol(createSessionId(),
(byte) config.getServiceid(),
SDPType.Request,
CompressType.UnCompress,
config.getProtocol().getSerializerType(),
PlatformType.Java,
requestProtocol);
ProtocolreceiveP = null;
Serverserver = null;
for(int i = 0;i <= count; i++){
server = dispatcher.GetServer();
if (server== null) {
logger.error("cannotget server");
throw newException("cannot get server");
}
try{
//本地存根调用
receiveP = server.request(sendP);
break;
} catch(IOExceptionio){
} catch(RebootExceptionrb){
this.createReboot(server);
}catch(TimeoutExceptionte){
} catch(Throwable ex){
}
}
if(receiveP== null){
throw newException("userdatatype error!");
}
if(receiveP.getSDPType() == SDPType.Response) {
ResponseProtocol rp = (ResponseProtocol)receiveP.getSdpEntity();
logger.debug("invoketime:" + (System.currentTimeMillis() - watcher) + "ms");
return new InvokeResult(rp.getResult(),rp.getOutpara());
} else if(receiveP.getSDPType()== SDPType.Reset){
logger.info(server.getName()+"server is reboot,system will change normal server!");
this.createReboot(server);
returninvoke(returnType, typeName, methodName, paras);
}else if(receiveP.getSDPType() == SDPType.Exception) {
ExceptionProtocol ep = (ExceptionProtocol)receiveP.getSdpEntity();
throwThrowErrorHelper.throwServiceError(ep.getErrorCode(), ep.getErrorMsg());
} else {
throw newException("userdatatype error!");
}
Server.request()过程
[java] view
plain copy
//每个CSocket有个WaitWindows注册了SessionId到WindowData的表
increaseCU();
CSocketsocket = null;
try {
try {//发送消息
socket = this.scoketpool.getSocket();
byte[] data= p.toBytes(socket.isRights(),socket.getDESKey());
socket.registerRec(p.getSessionID());
socket.send(data);
} catch(Throwable ex) {
logger.error("Serverget socket Exception", ex);
throw ex;
}finally {
if(socket!= null){
socket.dispose();
}
}
//接收应答
byte[]buffer = socket.receive(p.getSessionID(), currUserCount);
Protocol result = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
if (this.state ==ServerState.Testing) {
relive();
}
return result;
} catch(IOException ex) {
} catch(Throwable ex) {
} finally {
}
Receive接收应答时,通过sessionId对应的AutoResetEvent等待
[java] view
plain copy
//CSocket.receive
AutoResetEvent event = wd.getEvent();
int timeout= getReadTimeout(socketConfig.getReceiveTimeout(), queueLen);
if(!event.waitOne(timeout)) {
throw newTimeoutException("Receive data timeout or error!timeout:" +timeout + "ms,queue length:" +queueLen);
}
返回应答可能为Response、Reset、Exception
Response -> 返回InvokeResult结构体
Reset -> 重启则再调用invoke(returnType,typeName, methodName, paras)
Exception -> 抛出异常
frameHandler从Socket接收数据,并根据协议反解出session id,然后从waitWindows中取出对应的WindowData,然后调用对应的Event的set()通知CSocket.receive 返回
客户端做的工作主要就是这些
以GaeaClientTest为入口
[java] view
plain copy
GaeaInit.init("conf/gaea.config");
/**
* 调用URL 格式:tcp://服务名//接口实现类
* 备注:
* 服务名:需要与gaea.config中的服务名一一对应
* 接口实现类:具体调用接口的接口实现类
*/
final String url = "tcp://demo/NewsService";
INewsService newsService = ProxyFactory.create(INewsService.class, url);
List<News> list = newsService.getNewsByCateID();
for (News news : list) {
System.out.println("ID is " + news.getNewsID() + " title is "
+ news.getTitle());
}
如注释所言,客户端通过Restfull格式url"tcp://serviceName/lookup "访问服务。
关键的一句:INewsService newsService = ProxyFactory.create(INewsService.class,url);
,这句话创建了一个实现了INewsService接口的代理,返回的这个代理对象将作为getNewsByCateID的调用句柄。
让我们看看代理的内部如何实现的,跟踪ProxyFactory.create
[java] view
plain copy
InvocationHandler handler = new ProxyStandard(type,serviceName, lookup);
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
newClass[]{type},
handler);
//代理的作用就是将所有调用转交到Handler
InvocationHandler接口实现类ProxyStandard
[java] view
plain copy
public ProxyStandard(Class<?> interfaceClass, String serviceName, String lookup){
this.interfaceClass =interfaceClass;
this.methodCaller = newMethodCaller(serviceName, lookup);
InvocationHandler主要由methodCaller完成功能,在invoke()中交给methodCaller
Object obj= methodCaller.doMethodCall(args, method);
MethodCaller机制 //doMethodCall
[java] view
plain copy
//准备参数
Type[]typeAry = methodInfo.getGenericParameterTypes();
Class<?>[] clsAry = methodInfo.getParameterTypes();
if (args== null) {
args = newObject[0];
}
Parameter[]paras = new Parameter[args.length];
List<Integer> outParas = newArrayList<Integer>();
if(typeAry != null) {
for (int i = 0;i < typeAry.length; i++) {
if(args[i] instanceof Out) {
paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.Out);
outParas.add(i);
} else {
paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.In);
}
}
}
//规范化方法名
StringmethodName = methodInfo.getName();
OperationContract ann =methodInfo.getAnnotation(OperationContract.class);
if (ann !=null) {
if(!ann.methodName().equals(AnnotationUtil.DEFAULT_VALUE)) {
methodName = "$" + ann.methodName();
}
}
ParameterreturnPara = new Parameter(null,methodInfo.getReturnType(), methodInfo.getGenericReturnType());
//调用方法
ServiceProxy proxy = ServiceProxy.getProxy(serviceName);
InvokeResultresult = proxy.invoke(returnPara, lookup,methodName, paras);
//设置返回值
if (result!= null && result.getOutPara() != null) {
for (int i = 0;i < outParas.size() && i < result.getOutPara().length; i++) {
Object op = args[outParas.get(i)];
if(op instanceof Out){
((Out)op).setOutPara(result.getOutPara()[i]);
}
}
}
returnresult.getResult();
这里出现了OperationContract注解,这是在方法上面注解的。注解有一个methodName属性。
关键点是ServiceProxy的getProxy和invoke方法。
ServiceProxy.getProxy:构造ServiceProxy
[java] view
plain copy
config =ServiceConfig.GetConfig(serviceName);
dispatcher = newDispatcher(config);
requestTime = config.getSocketPool().getReconnectTime();
intserverCount = 1;
if(dispatcher.GetAllServer()!= null && dispatcher.GetAllServer().size()> 0){
serverCount = dispatcher.GetAllServer().size();
}
ioreconnect =serverCount - 1;
// count = max {ioreconnect,requestTime}
count = requestTime;
if(ioreconnect > requestTime){
count = ioreconnect;
}
首先读取配置文件中属性为//Service[@name= serviceName]的节点
创建实现负载均衡的dispatcher,在这里创建了Server
[java] view
plain copy
for(ServerProfile ser : config.getServers()) {
if(ser.getWeithtRate() > 0) {
Server s = newServer(ser);
if(s.getState() != ServerState.Disable) {
ScoketPool sp = newScoketPool(s, config);
s.setScoketpool(sp);
ServerPool.add(s);
}
}
}
ServiceProxy.Invoke方法
[java] view
plain copy
//构造RequestProtocol
RequestProtocol requestProtocol = newRequestProtocol(typeName, methodName, listPara);
ProtocolsendP = new Protocol(createSessionId(),
(byte) config.getServiceid(),
SDPType.Request,
CompressType.UnCompress,
config.getProtocol().getSerializerType(),
PlatformType.Java,
requestProtocol);
ProtocolreceiveP = null;
Serverserver = null;
for(int i = 0;i <= count; i++){
server = dispatcher.GetServer();
if (server== null) {
logger.error("cannotget server");
throw newException("cannot get server");
}
try{
//本地存根调用
receiveP = server.request(sendP);
break;
} catch(IOExceptionio){
} catch(RebootExceptionrb){
this.createReboot(server);
}catch(TimeoutExceptionte){
} catch(Throwable ex){
}
}
if(receiveP== null){
throw newException("userdatatype error!");
}
if(receiveP.getSDPType() == SDPType.Response) {
ResponseProtocol rp = (ResponseProtocol)receiveP.getSdpEntity();
logger.debug("invoketime:" + (System.currentTimeMillis() - watcher) + "ms");
return new InvokeResult(rp.getResult(),rp.getOutpara());
} else if(receiveP.getSDPType()== SDPType.Reset){
logger.info(server.getName()+"server is reboot,system will change normal server!");
this.createReboot(server);
returninvoke(returnType, typeName, methodName, paras);
}else if(receiveP.getSDPType() == SDPType.Exception) {
ExceptionProtocol ep = (ExceptionProtocol)receiveP.getSdpEntity();
throwThrowErrorHelper.throwServiceError(ep.getErrorCode(), ep.getErrorMsg());
} else {
throw newException("userdatatype error!");
}
Server.request()过程
[java] view
plain copy
//每个CSocket有个WaitWindows注册了SessionId到WindowData的表
increaseCU();
CSocketsocket = null;
try {
try {//发送消息
socket = this.scoketpool.getSocket();
byte[] data= p.toBytes(socket.isRights(),socket.getDESKey());
socket.registerRec(p.getSessionID());
socket.send(data);
} catch(Throwable ex) {
logger.error("Serverget socket Exception", ex);
throw ex;
}finally {
if(socket!= null){
socket.dispose();
}
}
//接收应答
byte[]buffer = socket.receive(p.getSessionID(), currUserCount);
Protocol result = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
if (this.state ==ServerState.Testing) {
relive();
}
return result;
} catch(IOException ex) {
} catch(Throwable ex) {
} finally {
}
Receive接收应答时,通过sessionId对应的AutoResetEvent等待
[java] view
plain copy
//CSocket.receive
AutoResetEvent event = wd.getEvent();
int timeout= getReadTimeout(socketConfig.getReceiveTimeout(), queueLen);
if(!event.waitOne(timeout)) {
throw newTimeoutException("Receive data timeout or error!timeout:" +timeout + "ms,queue length:" +queueLen);
}
返回应答可能为Response、Reset、Exception
Response -> 返回InvokeResult结构体
Reset -> 重启则再调用invoke(returnType,typeName, methodName, paras)
Exception -> 抛出异常
frameHandler从Socket接收数据,并根据协议反解出session id,然后从waitWindows中取出对应的WindowData,然后调用对应的Event的set()通知CSocket.receive 返回
客户端做的工作主要就是这些
相关文章推荐
- Gaea源码阅读(二):客户端流程
- Gaea源码阅读(五):C客户端
- Gaea源码阅读(五):C客户端
- Hbase源码阅读(二) 客户端定位Region流程
- Gaea源码阅读(三):服务端启动流程
- Gaea源码阅读(三):服务端启动流程
- kafka源码解析之十七消费者流程(客户端如何获取topic的数据)
- Gaea源码阅读(四):服务端通讯
- Google test源码阅读(一):基本执行流程
- CI框架源码阅读笔记1 - 环境准备、基本术语和框架流程
- Struts2源码阅读(1)_整体流程分析
- 天龙源码分析 - 客户端登录流程
- Spark 源码阅读(5)——Spark-submit任务提交流程
- skynet底层源码阅读(8)-启动流程
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- Nginx 源码阅读笔记4 启动流程
- Spark源码阅读笔记:任务提交流程整理
- DL4J源码阅读(六):LSTM信号前传处理流程
- daily news新闻阅读客户端应用源码(兼容iPhone和iPad)
- 大众点评Cat源码阅读(七)——客户端选择server的机制