Gaea源码阅读(四):服务端通讯
2018-03-05 16:00
495 查看
转载地址:http://blog.csdn.net/m_vptr/article/details/9163913
在(三)中加载server时,将根据配置建立tcp/http/telnet服务。如demo中这三个服务中enable了tcp和telnet。
以tcp服务为例,实现类在gaea.server.tcp.implement中配置
[java] view
plain copy
<!-- socket server implement class -->
<property>
<name>gaea.server.tcp.implement</name>
<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>
</property>
//实现类SocketServer
[java] view
plain copy
/**
* start netty server
*/
@Override
public void start() throws Exception {
logger.info("loading invoker...");
String invoker = Global.getSingleton().getServiceConfig().getString("gaea.proxy.invoker.implement");
invokerHandle = (IInvokerHandle) Class.forName(invoker).newInstance();
logger.info("initing server...");
initSocketServer();
}
InvokerHandle提供了同步和异步两种选择,这里配置使用的是异步Handler
[html] view
plain copy
<!-- proxy invoker-->
<property>
<name>gaea.proxy.invoker.implement</name>
<value>com.bj58.spat.gaea.server.core.proxy.AsyncInvokerHandle</value>
</property>
initSocketServer建立SocketChannel,Handler使用SocketHandler
[java] view
plain copy
bootstrap.setFactory(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.workerCount")
)
);
//使用SocketHandler
SocketHandler handler = new SocketHandler();
bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));
try {
InetSocketAddress socketAddress = null;
socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));
Channel channel = bootstrap.bind(socketAddress);
allChannels.add(channel);
h (Exception e) {}
//SocketHandler处理请求
[java] view
plain copy
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
try {
logger.debug("message receive");
ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer();
byte[] reciveByte = buffer.array();
logger.debug("reciveByte.length:" + reciveByte.length);
byte[] headDelimiter = new byte[0];
System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);
byte[] requestBuffer = new byte[reciveByte.length];
System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));
GaeaContext gaeaContext = new GaeaContext(requestBuffer,
new GaeaChannel(e.getChannel()),
ServerType.TCP,
this);
SocketServer.invokerHandle.invoke(gaeaContext);
} catch(Throwable ex) {
byte[] response = ExceptionHelper.createErrorProtocol();
e.getChannel().write(response);
logger.error("SocketHandler invoke error", ex);
}
}
服务器端收到消息后,调用该messageReceived(Netty机制)。在该方法中将转交给invokerHandle.invoke
[java] view
plain copy
//AsyncInvokerHandler
public void invoke(final GaeaContext context) throws Exception {
logger.debug("-------------------begin async invoke-------------------");
asyncInvoker.run(taskTimeOut, new IAsyncHandler(){
@Override
public Object run() throws Throwable {
// request filter
for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
f.filter(context);
}
}
//调用
if(context.isDoInvoke()) {
if(context.getServerType() == ServerType.HTTP){
httpThreadLocal.set(context.getHttpContext());
}
doInvoke(context);
}
// response filter
for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
f.filter(context);
}
}
return context;
}
fliter在这里被使用了
doInvoke将使用到(三)中的Proxy工厂类、ProxyStub
[java] view
plain copy
//根据lookup找到ProxyStub
IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup());
//invoker real service
GaeaResponse gaeaResponse = localProxy.invoke(context);
//返回应答
response = createResponse(gaeaResponse);
至此,Gaea基本流程基本走了一遍。
在(三)中加载server时,将根据配置建立tcp/http/telnet服务。如demo中这三个服务中enable了tcp和telnet。
以tcp服务为例,实现类在gaea.server.tcp.implement中配置
[java] view
plain copy
<!-- socket server implement class -->
<property>
<name>gaea.server.tcp.implement</name>
<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>
</property>
//实现类SocketServer
[java] view
plain copy
/**
* start netty server
*/
@Override
public void start() throws Exception {
logger.info("loading invoker...");
String invoker = Global.getSingleton().getServiceConfig().getString("gaea.proxy.invoker.implement");
invokerHandle = (IInvokerHandle) Class.forName(invoker).newInstance();
logger.info("initing server...");
initSocketServer();
}
InvokerHandle提供了同步和异步两种选择,这里配置使用的是异步Handler
[html] view
plain copy
<!-- proxy invoker-->
<property>
<name>gaea.proxy.invoker.implement</name>
<value>com.bj58.spat.gaea.server.core.proxy.AsyncInvokerHandle</value>
</property>
initSocketServer建立SocketChannel,Handler使用SocketHandler
[java] view
plain copy
bootstrap.setFactory(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.workerCount")
)
);
//使用SocketHandler
SocketHandler handler = new SocketHandler();
bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));
try {
InetSocketAddress socketAddress = null;
socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));
Channel channel = bootstrap.bind(socketAddress);
allChannels.add(channel);
h (Exception e) {}
//SocketHandler处理请求
[java] view
plain copy
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
try {
logger.debug("message receive");
ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer();
byte[] reciveByte = buffer.array();
logger.debug("reciveByte.length:" + reciveByte.length);
byte[] headDelimiter = new byte[0];
System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);
byte[] requestBuffer = new byte[reciveByte.length];
System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));
GaeaContext gaeaContext = new GaeaContext(requestBuffer,
new GaeaChannel(e.getChannel()),
ServerType.TCP,
this);
SocketServer.invokerHandle.invoke(gaeaContext);
} catch(Throwable ex) {
byte[] response = ExceptionHelper.createErrorProtocol();
e.getChannel().write(response);
logger.error("SocketHandler invoke error", ex);
}
}
服务器端收到消息后,调用该messageReceived(Netty机制)。在该方法中将转交给invokerHandle.invoke
[java] view
plain copy
//AsyncInvokerHandler
public void invoke(final GaeaContext context) throws Exception {
logger.debug("-------------------begin async invoke-------------------");
asyncInvoker.run(taskTimeOut, new IAsyncHandler(){
@Override
public Object run() throws Throwable {
// request filter
for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
f.filter(context);
}
}
//调用
if(context.isDoInvoke()) {
if(context.getServerType() == ServerType.HTTP){
httpThreadLocal.set(context.getHttpContext());
}
doInvoke(context);
}
// response filter
for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
f.filter(context);
}
}
return context;
}
fliter在这里被使用了
doInvoke将使用到(三)中的Proxy工厂类、ProxyStub
[java] view
plain copy
//根据lookup找到ProxyStub
IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup());
//invoker real service
GaeaResponse gaeaResponse = localProxy.invoke(context);
//返回应答
response = createResponse(gaeaResponse);
至此,Gaea基本流程基本走了一遍。
相关文章推荐
- Gaea源码阅读(三):服务端启动流程
- Gaea源码阅读(三):服务端启动流程
- Dubbo源码阅读之 服务端和客户端处理链
- Gaea源码阅读(二):客户端流程
- Gaea源码阅读(五):C客户端
- Gaea源码阅读(二):客户端流程
- zookeeper源码阅读分析笔记--客户端服务端通信机制以及session超时、过期处理
- Gaea源码阅读(五):C客户端
- node.js版简单客户端和服务端通讯源码
- Hadoop RPC源码阅读-服务端Server
- 即时通讯_Python服务端_CPP客户端_源码
- solr源码导入eclipse 分类: H4_SOLR/LUCENCE 2014-07-14 14:11 550人阅读 评论(1) 收藏
- [PHP源码阅读]array_pop和array_shift函数
- 关于阅读MFC源码的MFC.bsc文件
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)
- Tensorflow object detection API 源码阅读笔记:RFCN
- Laravel 5.1 源码阅读笔记
- LUA 源码阅读笔记(一)
- Spring源码阅读1---导入eclipse
- HashMap源码阅读(2)-put操作