您的位置:首页 > 其它

Gaea源码阅读(二):客户端流程

2018-03-05 15:57 781 查看
转载地址:http://blog.csdn.net/m_vptr/article/details/9147279

以GaeaClientTest为入口

[java] view
plain copy

GaeaInit.init("conf/gaea.config");  

/** 

 * 调用URL 格式:tcp://服务名//接口实现类  

 * 备注:  

 * 服务名:需要与gaea.config中的服务名一一对应 

 * 接口实现类:具体调用接口的接口实现类 

 */  

  

final String url = "tcp://demo/NewsService";  

INewsService newsService = ProxyFactory.create(INewsService.class, url);  

List<News> list = newsService.getNewsByCateID();  

for (News news : list) {  

    System.out.println("ID is " + news.getNewsID() + " title is "  

            + news.getTitle());  

}  

如注释所言,客户端通过Restfull格式url"tcp://serviceName/lookup "访问服务。

 
关键的一句:INewsService newsService = ProxyFactory.create(INewsService.class,url);
,这句话创建了一个实现了INewsService接口的代理,返回的这个代理对象将作为getNewsByCateID的调用句柄。

让我们看看代理的内部如何实现的,跟踪ProxyFactory.create

[java] view
plain copy

InvocationHandler handler = new ProxyStandard(type,serviceName, lookup);  

 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),  

         newClass[]{type},  

        handler);  

  

//代理的作用就是将所有调用转交到Handler  



InvocationHandler接口实现类ProxyStandard

[java] view
plain copy

public ProxyStandard(Class<?> interfaceClass, String serviceName, String lookup){  

    this.interfaceClass =interfaceClass;  

    this.methodCaller = newMethodCaller(serviceName, lookup);  

InvocationHandler主要由methodCaller完成功能,在invoke()中交给methodCaller

        Object obj= methodCaller.doMethodCall(args, method);

MethodCaller机制 //doMethodCall

[java] view
plain copy

 //准备参数  

Type[]typeAry = methodInfo.getGenericParameterTypes();  

Class<?>[] clsAry = methodInfo.getParameterTypes();  

if (args== null) {  

    args = newObject[0];  

}  

  

Parameter[]paras = new Parameter[args.length];  

List<Integer> outParas = newArrayList<Integer>();  

  

if(typeAry != null) {  

    for (int i = 0;i < typeAry.length; i++) {  

        if(args[i] instanceof Out) {  

           paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.Out);  

            outParas.add(i);  

        } else {  

           paras[i] = newParameter(args[i], clsAry[i], typeAry[i], ParaType.In);  

        }  

    }  

}  

  

//规范化方法名  

StringmethodName = methodInfo.getName();  

OperationContract ann =methodInfo.getAnnotation(OperationContract.class);  

if (ann !=null) {  

    if(!ann.methodName().equals(AnnotationUtil.DEFAULT_VALUE)) {  

       methodName = "$" + ann.methodName();  

    }  

}  

  

ParameterreturnPara = new Parameter(null,methodInfo.getReturnType(), methodInfo.getGenericReturnType());  

  

//调用方法  

ServiceProxy proxy = ServiceProxy.getProxy(serviceName);  

  

InvokeResultresult = proxy.invoke(returnPara, lookup,methodName, paras);  

  

//设置返回值  

if (result!= null && result.getOutPara() != null) {  

    for (int i = 0;i < outParas.size() && i < result.getOutPara().length; i++) {  

       Object op = args[outParas.get(i)];  

        if(op instanceof Out){  

           ((Out)op).setOutPara(result.getOutPara()[i]);  

        }  

    }  

}  

returnresult.getResult();  

这里出现了OperationContract注解,这是在方法上面注解的。注解有一个methodName属性。

 

关键点是ServiceProxy的getProxy和invoke方法。

ServiceProxy.getProxy:构造ServiceProxy

[java] view
plain copy

config =ServiceConfig.GetConfig(serviceName);  

dispatcher = newDispatcher(config);  

  

requestTime = config.getSocketPool().getReconnectTime();  

     intserverCount = 1;  

     if(dispatcher.GetAllServer()!= null && dispatcher.GetAllServer().size()> 0){  

               serverCount = dispatcher.GetAllServer().size();  

     }  

      

     ioreconnect =serverCount - 1;  

  

 //     count = max {ioreconnect,requestTime}  

     count = requestTime;  

      

     if(ioreconnect > requestTime){  

               count = ioreconnect;  

     }  

                   首先读取配置文件中属性为//Service[@name= serviceName]的节点

 

                   创建实现负载均衡的dispatcher,在这里创建了Server

[java] view
plain copy

for(ServerProfile ser : config.getServers()) {  

    if(ser.getWeithtRate() > 0) {  

       Server s = newServer(ser);  

        if(s.getState() != ServerState.Disable) {  

           ScoketPool sp = newScoketPool(s, config);  

           s.setScoketpool(sp);  

           ServerPool.add(s);  

        }  

    }  

}  

ServiceProxy.Invoke方法

[java] view
plain copy

//构造RequestProtocol  

RequestProtocol requestProtocol = newRequestProtocol(typeName, methodName, listPara);  

ProtocolsendP = new Protocol(createSessionId(),  

                   (byte) config.getServiceid(),  

                   SDPType.Request,  

                   CompressType.UnCompress,  

                   config.getProtocol().getSerializerType(),  

                   PlatformType.Java,  

                   requestProtocol);  

  

ProtocolreceiveP = null;  

Serverserver = null;  

  

for(int i = 0;i <= count; i++){  

         server = dispatcher.GetServer();  

    if (server== null) {  

        logger.error("cannotget server");  

        throw newException("cannot get server");  

    }  

    try{  

             //本地存根调用  

             receiveP = server.request(sendP);  

             break;  

    } catch(IOExceptionio){  

              

    } catch(RebootExceptionrb){  

             this.createReboot(server);  

    }catch(TimeoutExceptionte){  

              

    } catch(Throwable ex){  

              

    }  

}  

  

if(receiveP== null){  

         throw newException("userdatatype error!");  

}  

  

if(receiveP.getSDPType() == SDPType.Response) {  

    ResponseProtocol rp = (ResponseProtocol)receiveP.getSdpEntity();  

    logger.debug("invoketime:" + (System.currentTimeMillis() - watcher) + "ms");  

    return new InvokeResult(rp.getResult(),rp.getOutpara());  

} else if(receiveP.getSDPType()== SDPType.Reset){  

    logger.info(server.getName()+"server is reboot,system will change normal server!");  

    this.createReboot(server);  

    returninvoke(returnType, typeName, methodName, paras);  

}else if(receiveP.getSDPType() == SDPType.Exception) {  

    ExceptionProtocol ep = (ExceptionProtocol)receiveP.getSdpEntity();  

    throwThrowErrorHelper.throwServiceError(ep.getErrorCode(), ep.getErrorMsg());  

} else {  

    throw newException("userdatatype error!");  

}  

Server.request()过程

[java] view
plain copy

//每个CSocket有个WaitWindows注册了SessionId到WindowData的表  

  

increaseCU();  

 CSocketsocket = null;  

 try {  

     try {//发送消息  

        socket = this.scoketpool.getSocket();  

        byte[] data= p.toBytes(socket.isRights(),socket.getDESKey());  

        socket.registerRec(p.getSessionID());  

        socket.send(data);   

     } catch(Throwable ex) {  

         logger.error("Serverget socket Exception", ex);  

         throw ex;  

     }finally {  

              if(socket!= null){  

                        socket.dispose();  

              }  

     }  

  

     //接收应答  

     byte[]buffer = socket.receive(p.getSessionID(), currUserCount);  

     Protocol result = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());  

     if (this.state ==ServerState.Testing) {  

        relive();  

     }  

     return result;  

 } catch(IOException ex) {  

      

 } catch(Throwable ex) {  

  

 } finally {  

     

 }  

 

Receive接收应答时,通过sessionId对应的AutoResetEvent等待

[java] view
plain copy

//CSocket.receive  

AutoResetEvent event = wd.getEvent();  

int timeout= getReadTimeout(socketConfig.getReceiveTimeout(), queueLen);  

if(!event.waitOne(timeout)) {  

    throw newTimeoutException("Receive data timeout or error!timeout:" +timeout + "ms,queue length:" +queueLen);  

}  

返回应答可能为Response、Reset、Exception

Response -> 返回InvokeResult结构体

Reset -> 重启则再调用invoke(returnType,typeName, methodName, paras)

Exception -> 抛出异常

frameHandler从Socket接收数据,并根据协议反解出session id,然后从waitWindows中取出对应的WindowData,然后调用对应的Event的set()通知CSocket.receive 返回

客户端做的工作主要就是这些
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: