学习笔记---DUBBO请求处理线程模型以及源码分析
2020-03-23 09:39
218 查看
继续以netty为底层通讯框架来分析,个人认为dubbo的线程模型设计思路是居于netty的线程模型延伸开来的的;
在netty中,处理开发者的业务逻辑如果过于复杂耗时的话,开发者利用线程池来另外创建线程来处理业务逻辑,这样不会占着IO线程的资源;反之如果业务逻辑简单计算较快的话,则使用IO线程来处理,则减少线程的损耗;
dubbo就是在这基础上提炼,通过官网我们了解到:
dubbo提供了5个线程模型派发策略:
1.all: 连接,心跳,断开,读写由全部由业务线程池来处理
2.direct: 连接,心跳,断开,读写由全部由worker线程池来处理
3.messge: 读写消息由业务线程池来处理,其余的由worker线程池来处理
4.exection: 读消息由业务线程池来处理,其余的由worker线程池来处理
5.connetion: 连接断开由worker线程池来处理,其余的由业务来处理
那么dubbo在何处进行通过spi来确定线程派发模型的呢?
既然是处理io事件的类,应该在建立服务端NettyServer对象,初始化ChannelPipeline时创建,
就在这里利用了spi的自适应机制获取代理类,
class com.alibaba.dubbo.remoting.Dispatcher$Adaptive,
断点:
可以得到手动编译得到的对象代码:
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher { public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "message"))); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); return extension.dispatch(arg0, arg1); } }
这里配置:
现在来学习下这几个ChannelHandler类:
- AllChannelHandler :
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { System.out.println(" AllChannelHandler.received(channel, message);"+handler.getClass().getName()+" "+message); ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } }
可以看出这个:AllChannelHandler 在所有的IO事件处理都交给业务线程池来处理;
以此类推其他的派生类;
在解码之后就对调用这个对应的:ChannelHandler 来处理;
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- dubbo请求处理线程模型实现分析
- Dubbo -- 系统学习 笔记 -- 示例 -- 线程模型
- Lighttpd1.4.20源码分析 笔记 状态机之请求处理
- (三)Netty源码学习笔记之boss线程处理流程
- 基于Netty3的RPC架构笔记3之线程模型源码分析
- (三)Netty源码学习笔记之boss线程处理流程
- Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
- zookeeper源码阅读分析笔记--客户端服务端通信机制以及session超时、过期处理
- Dubbo学习笔记10:Dubbo服务消费方启动流程源码分析
- dubbo服务端处理请求源码分析
- Struts2请求处理流程及源码分析
- 第二人生的源码分析(四十四)虚拟文件系统的请求处理
- 概念模型——分析模式学习笔记
- Combres库 学习小结以及部分源码分析
- eCos学习笔记之中断处理代码分析
- delphi.net开发学习笔记(2),Application、Session、ViewState、Cookies状态以及变量处理
- WPF笔记12: 线程处理模型
- Ogre源码分析与学习笔记-0
- Silverlight学习笔记(四)-----【转】Silverlight 线程处理概念和调度程序
- Struts2请求处理流程及源码分析