您的位置:首页 > 其它

Gaea源码阅读(四):服务端通讯

2018-03-05 16:00 495 查看
转载地址:http://blog.csdn.net/m_vptr/article/details/9163913

在(三)中加载server时,将根据配置建立tcp/http/telnet服务。如demo中这三个服务中enable了tcp和telnet。

 

以tcp服务为例,实现类在gaea.server.tcp.implement中配置

[java] view
plain copy

<!-- socket server implement class -->  

<property>  

<name>gaea.server.tcp.implement</name>  

<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>  

</property>  

//实现类SocketServer

[java] view
plain copy

/** 

 * start netty server 

 */  

@Override  

public void start() throws Exception {  

    logger.info("loading invoker...");  

    String invoker = Global.getSingleton().getServiceConfig().getString("gaea.proxy.invoker.implement");  

    invokerHandle = (IInvokerHandle) Class.forName(invoker).newInstance();  

    logger.info("initing server...");  

    initSocketServer();  

}  

InvokerHandle提供了同步和异步两种选择,这里配置使用的是异步Handler

[html] view
plain copy

<!-- proxy invoker-->  

<property>  

<name>gaea.proxy.invoker.implement</name>  

<value>com.bj58.spat.gaea.server.core.proxy.AsyncInvokerHandle</value>  

</property>  

initSocketServer建立SocketChannel,Handler使用SocketHandler

[java] view
plain copy

bootstrap.setFactory(new NioServerSocketChannelFactory(  

                    Executors.newCachedThreadPool(),  

                    Executors.newCachedThreadPool(),  

                    Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.workerCount")  

                   )  

                    );  

//使用SocketHandler  

SocketHandler handler = new SocketHandler();  

bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));  

  

try {  

    InetSocketAddress socketAddress = null;  

    socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),  

            Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));  

    Channel channel = bootstrap.bind(socketAddress);  

    allChannels.add(channel);  

h (Exception e) {}  

//SocketHandler处理请求

[java] view
plain copy

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  

        try {  

            logger.debug("message receive");  

            ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer();  

            byte[] reciveByte = buffer.array();  

            logger.debug("reciveByte.length:" + reciveByte.length);  

              

            byte[] headDelimiter = new byte[0];  

            System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);  

              

            byte[] requestBuffer = new byte[reciveByte.length];  

            System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));  

              

            GaeaContext gaeaContext = new GaeaContext(requestBuffer,  

                    new GaeaChannel(e.getChannel()),   

                    ServerType.TCP,  

                    this);  

              

            SocketServer.invokerHandle.invoke(gaeaContext);  

        } catch(Throwable ex) {  

            byte[] response = ExceptionHelper.createErrorProtocol();  

            e.getChannel().write(response);  

            logger.error("SocketHandler invoke error", ex);  

        }  

    }  

服务器端收到消息后,调用该messageReceived(Netty机制)。在该方法中将转交给invokerHandle.invoke

[java] view
plain copy

//AsyncInvokerHandler  

    public void invoke(final GaeaContext context) throws Exception {  

        logger.debug("-------------------begin async invoke-------------------");  

        asyncInvoker.run(taskTimeOut, new IAsyncHandler(){  

            @Override  

            public Object run() throws Throwable {  

                // request filter  

                for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {  

                    if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {  

                        f.filter(context);  

                    }  

                }  

                  

                //调用  

                if(context.isDoInvoke()) {  

                    if(context.getServerType() == ServerType.HTTP){  

                        httpThreadLocal.set(context.getHttpContext());  

                    }  

                    doInvoke(context);  

                }  

                  

                // response filter  

                for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {  

                    if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {  

                        f.filter(context);  

                    }  

                }  

                return context;  

            }  

fliter在这里被使用了

doInvoke将使用到(三)中的Proxy工厂类、ProxyStub

[java] view
plain copy

//根据lookup找到ProxyStub  

IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup());  

  

//invoker real service  

GaeaResponse gaeaResponse = localProxy.invoke(context);  

  

                     //返回应答  

response = createResponse(gaeaResponse);  

至此,Gaea基本流程基本走了一遍。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: