Gaea源码阅读(二):客户端流程
2013-06-21 17:58
423 查看
以GaeaClientTest为入口
如注释所言,客户端通过Restfull格式url"tcp://serviceName/lookup "访问服务。
关键的一句:INewsService newsService = ProxyFactory.create(INewsService.class,url); ,这句话创建了一个实现了INewsService接口的代理,返回的这个代理对象将作为getNewsByCateID的调用句柄。
让我们看看代理的内部如何实现的,跟踪ProxyFactory.create
InvocationHandler接口实现类ProxyStandard
InvocationHandler主要由methodCaller完成功能,在invoke()中交给methodCaller
Object obj= methodCaller.doMethodCall(args, method);
MethodCaller机制 //doMethodCall
这里出现了OperationContract注解,这是在方法上面注解的。注解有一个methodName属性。
关键点是ServiceProxy的getProxy和invoke方法。
ServiceProxy.getProxy:构造ServiceProxy
首先读取配置文件中属性为//Service[@name= serviceName]的节点
创建实现负载均衡的dispatcher,在这里创建了Server
ServiceProxy.Invoke方法
Server.request()过程
Receive接收应答时,通过sessionId对应的AutoResetEvent等待
返回应答可能为Response、Reset、Exception
Response -> 返回InvokeResult结构体
Reset -> 重启则再调用invoke(returnType,typeName, methodName, paras)
Exception -> 抛出异常
frameHandler从Socket接收数据,并根据协议反解出session id,然后从waitWindows中取出对应的WindowData,然后调用对应的Event的set()通知CSocket.receive 返回
客户端做的工作主要就是这些
GaeaInit.init("conf/gaea.config"); /** * 调用URL 格式:tcp://服务名//接口实现类 * 备注: * 服务名:需要与gaea.config中的服务名一一对应 * 接口实现类:具体调用接口的接口实现类 */ final String url = "tcp://demo/NewsService"; INewsService newsService = ProxyFactory.create(INewsService.class, url); List<News> list = newsService.getNewsByCateID(); for (News news : list) { System.out.println("ID is " + news.getNewsID() + " title is " + news.getTitle()); }
如注释所言,客户端通过Restfull格式url"tcp://serviceName/lookup "访问服务。
关键的一句:INewsService newsService = ProxyFactory.create(INewsService.class,url); ,这句话创建了一个实现了INewsService接口的代理,返回的这个代理对象将作为getNewsByCateID的调用句柄。
让我们看看代理的内部如何实现的,跟踪ProxyFactory.create
InvocationHandler handler = new ProxyStandard(type,serviceName, lookup); return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), newClass[]{type}, handler); //代理的作用就是将所有调用转交到Handler
InvocationHandler接口实现类ProxyStandard
public ProxyStandard(Class<?> interfaceClass, String serviceName, String lookup){ this.interfaceClass =interfaceClass; this.methodCaller = newMethodCaller(serviceName, lookup); }
InvocationHandler主要由methodCaller完成功能,在invoke()中交给methodCaller
Object obj= methodCaller.doMethodCall(args, method);
MethodCaller机制 //doMethodCall
//准备参数 Type[]typeAry = methodInfo.getGenericParameterTypes(); Class<?>[] clsAry = methodInfo.getParameterTypes(); if (args== null) { args = newObject[0]; } Parameter[]paras = new Parameter[args.length]; List<Integer> outParas = newArrayList<Integer>(); if(typeAry != null) { for (int i = 0;i < typeAry.length; i++) { if(args[i] instanceof Out) { paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.Out); outParas.add(i); } else { paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.In); } } } //规范化方法名 StringmethodName = methodInfo.getName(); OperationContract ann =methodInfo.getAnnotation(OperationContract.class); if (ann !=null) { if(!ann.methodName().equals(AnnotationUtil.DEFAULT_VALUE)) { methodName = "$" + ann.methodName(); } } ParameterreturnPara = new Parameter(null,methodInfo.getReturnType(), methodInfo.getGenericReturnType()); //调用方法 ServiceProxy proxy = ServiceProxy.getProxy(serviceName); InvokeResultresult = proxy.invoke(returnPara, lookup,methodName, paras); //设置返回值 if (result!= null && result.getOutPara() != null) { for (int i = 0;i < outParas.size() && i < result.getOutPara().length; i++) { Object op = args[outParas.get(i)]; if(op instanceof Out){ ((Out)op).setOutPara(result.getOutPara()[i]); } } } returnresult.getResult();
这里出现了OperationContract注解,这是在方法上面注解的。注解有一个methodName属性。
关键点是ServiceProxy的getProxy和invoke方法。
ServiceProxy.getProxy:构造ServiceProxy
config =ServiceConfig.GetConfig(serviceName); dispatcher = newDispatcher(config); requestTime = config.getSocketPool().getReconnectTime(); intserverCount = 1; if(dispatcher.GetAllServer()!= null && dispatcher.GetAllServer().size()> 0){ serverCount = dispatcher.GetAllServer().size(); } ioreconnect =serverCount - 1; // count = max {ioreconnect,requestTime} count = requestTime; if(ioreconnect > requestTime){ count = ioreconnect; }
首先读取配置文件中属性为//Service[@name= serviceName]的节点
创建实现负载均衡的dispatcher,在这里创建了Server
for(ServerProfile ser : config.getServers()) { if(ser.getWeithtRate() > 0) { Server s = newServer(ser); if(s.getState() != ServerState.Disable) { ScoketPool sp = newScoketPool(s, config); s.setScoketpool(sp); ServerPool.add(s); } } }
ServiceProxy.Invoke方法
//构造RequestProtocol RequestProtocol requestProtocol = newRequestProtocol(typeName, methodName, listPara); ProtocolsendP = new Protocol(createSessionId(), (byte) config.getServiceid(), SDPType.Request, CompressType.UnCompress, config.getProtocol().getSerializerType(), PlatformType.Java, requestProtocol); ProtocolreceiveP = null; Serverserver = null; for(int i = 0;i <= count; i++){ server = dispatcher.GetServer(); if (server== null) { logger.error("cannotget server"); throw newException("cannot get server"); } try{ //本地存根调用 receiveP = server.request(sendP); break; } catch(IOExceptionio){ } catch(RebootExceptionrb){ this.createReboot(server); }catch(TimeoutExceptionte){ } catch(Throwable ex){ } } if(receiveP== null){ throw newException("userdatatype error!"); } if(receiveP.getSDPType() == SDPType.Response) { ResponseProtocol rp = (ResponseProtocol)receiveP.getSdpEntity(); logger.debug("invoketime:" + (System.currentTimeMillis() - watcher) + "ms"); return new InvokeResult(rp.getResult(),rp.getOutpara()); } else if(receiveP.getSDPType()== SDPType.Reset){ logger.info(server.getName()+"server is reboot,system will change normal server!"); this.createReboot(server); returninvoke(returnType, typeName, methodName, paras); }else if(receiveP.getSDPType() == SDPType.Exception) { ExceptionProtocol ep = (ExceptionProtocol)receiveP.getSdpEntity(); throwThrowErrorHelper.throwServiceError(ep.getErrorCode(), ep.getErrorMsg()); } else { throw newException("userdatatype error!"); }
Server.request()过程
//每个CSocket有个WaitWindows注册了SessionId到WindowData的表 increaseCU(); CSocketsocket = null; try { try {//发送消息 socket = this.scoketpool.getSocket(); byte[] data= p.toBytes(socket.isRights(),socket.getDESKey()); socket.registerRec(p.getSessionID()); socket.send(data); } catch(Throwable ex) { logger.error("Serverget socket Exception", ex); throw ex; }finally { if(socket!= null){ socket.dispose(); } } //接收应答 byte[]buffer = socket.receive(p.getSessionID(), currUserCount); Protocol result = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey()); if (this.state ==ServerState.Testing) { relive(); } return result; } catch(IOException ex) { } catch(Throwable ex) { } finally { }
Receive接收应答时,通过sessionId对应的AutoResetEvent等待
//CSocket.receive AutoResetEvent event = wd.getEvent(); int timeout= getReadTimeout(socketConfig.getReceiveTimeout(), queueLen); if(!event.waitOne(timeout)) { throw newTimeoutException("Receive data timeout or error!timeout:" +timeout + "ms,queue length:" +queueLen); }
返回应答可能为Response、Reset、Exception
Response -> 返回InvokeResult结构体
Reset -> 重启则再调用invoke(returnType,typeName, methodName, paras)
Exception -> 抛出异常
frameHandler从Socket接收数据,并根据协议反解出session id,然后从waitWindows中取出对应的WindowData,然后调用对应的Event的set()通知CSocket.receive 返回
客户端做的工作主要就是这些
相关文章推荐
- Gaea源码阅读(二):客户端流程
- Hbase源码阅读(二) 客户端定位Region流程
- Gaea源码阅读(三):服务端启动流程
- Gaea源码阅读(三):服务端启动流程
- Gaea源码阅读(五):C客户端
- Gaea源码阅读(五):C客户端
- GitHub超详细图文攻略 - Git客户端下载安装 GitHub提交修改源码工作流程 Git分支 标签 过滤 Git版本工作流
- memched1.0源码阅读(4)——事件的处理流程
- Struts2源码阅读之Action和Interceptor的执行流程
- Google test源码阅读(一):基本执行流程
- GitHub超详细图文攻略 - Git客户端下载安装 GitHub提交修改源码工作流程 Git分支 标签 过滤 Git版本工作流
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- FAAC源码阅读(2)——AAC的编解码流程
- (源码阅读)Resources资源加载流程
- HBase的scan源码分析客户端部分之整体流程(一)
- 安卓 View 工作流程-Measure、 Layout、Draw 源码阅读
- GitHub超详细图文攻略 - Git客户端下载安装 GitHub提交修改源码工作流程 Git分支 标签 过滤 Git版本工作流
- Struts2源码阅读(一)_Struts2框架流程概述
- RTMP学习(六)rtmpdump源码阅读(1)总体流程
- 非典型2D游戏引擎 Orx 源码阅读笔记 完结篇(7) 渲染流程