您的位置:首页 > 其它

学习笔记---DUBBO请求处理线程模型以及源码分析

2020-03-23 09:39 218 查看

继续以netty为底层通讯框架来分析,个人认为dubbo的线程模型设计思路是居于netty的线程模型延伸开来的的;
在netty中,处理开发者的业务逻辑如果过于复杂耗时的话,开发者利用线程池来另外创建线程来处理业务逻辑,这样不会占着IO线程的资源;反之如果业务逻辑简单计算较快的话,则使用IO线程来处理,则减少线程的损耗;
dubbo就是在这基础上提炼,通过官网我们了解到:

dubbo提供了5个线程模型派发策略:
1.all: 连接,心跳,断开,读写由全部由业务线程池来处理
2.direct: 连接,心跳,断开,读写由全部由worker线程池来处理
3.messge: 读写消息由业务线程池来处理,其余的由worker线程池来处理
4.exection: 读消息由业务线程池来处理,其余的由worker线程池来处理
5.connetion: 连接断开由worker线程池来处理,其余的由业务来处理

那么dubbo在何处进行通过spi来确定线程派发模型的呢?
既然是处理io事件的类,应该在建立服务端NettyServer对象,初始化ChannelPipeline时创建,

就在这里利用了spi的自适应机制获取代理类,
class com.alibaba.dubbo.remoting.Dispatcher$Adaptive,
断点:

可以得到手动编译得到的对象代码:

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher {
public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "message")));
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
return extension.dispatch(arg0, arg1);
}
}

这里配置:


现在来学习下这几个ChannelHandler类:

  1. AllChannelHandler :
public class AllChannelHandler extends WrappedChannelHandler {

public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}

@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}

@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
System.out.println(" AllChannelHandler.received(channel, message);"+handler.getClass().getName()+" "+message);
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}

可以看出这个:AllChannelHandler 在所有的IO事件处理都交给业务线程池来处理;

以此类推其他的派生类;

在解码之后就对调用这个对应的:ChannelHandler 来处理;

  • 点赞
  • 收藏
  • 分享
  • 文章举报
lbingk 发布了11 篇原创文章 · 获赞 0 · 访问量 227 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: