您的位置:首页 > 其它

dubbo分线程池处理同一服务请求随记

2016-09-02 15:07 507 查看
前言 最近在看release it,第二章中的案例提到底层服务被数据库阻塞后把服务线程池全部占满并导致上层应用一直阻塞,结合自己部门的线上服务考虑,比如一个服务下会http请求外部应用,根据不同参数会处理时间长短会不同,这样考虑从dubbo中间层做一个保护,比如把处理时间会很长的这种参数的请求归类到一个线程池下,其他的归类到另外一个线程池下,这样即使处理时间长的请求一直阻塞起码不影响到其他的功能。 当然还有其他解决办法:增加线程池大小或者加机器,但是无法预知什么时间点会增加到什么量,即使增加线程池大小也无法根治根据处理时长分服务处理,目前dubbo对于同一应用服务都是使用一个线程池处理接收请求,分两个应用代价太大减小超时时长,一定程度上可以解决,很有可能大部分正常请求也无法正常响应,比如99%执行时长在7s而设置超时时长为2s显然不太合适变同步为异步,这当然是最能根治的办法,但是从用户交互及后台开发流程都得改变,代价略大dubbo网络层结构dubbo线程模型 dubbo接收请求基本流程 netty处理接收请求->NioServerSocketPipelineSink$Boss->注册NioWorker线程用于处理长连接和IO操作->DefaultChannelPipeline(处理请求的反序列化和分发handler,初始化过程见NettyServer,会往pipeline加入decoder、encoder和NettyHandler)->decoder做反序列化->NettyHandler处理请求->NettyServer处理请求(NettyServer初始化封装了MultiMessageHandler、HeartbeatHandler以及根据dispatcher得到的channelHandler,默认是AllDispatcher)->MultiMessageHandlerfor each处理批量请求->HeartbeatHandler处理心跳请求->AllDispatcher得到的AllChannelHandler处理将通道所有状态变更用新线程处理(包括建立连接、断开连接、接收请求、异常)->从指定的线程池拿到线程执行rpc请求->HeaderExchangeHandler判断双向和单向处理请求,双向则将结果使用当前channel回写->到DubboProtocol$ExchangeHandlerAdapter处理请求调用->到此服务端网络传输层结束如何自定义一个Dispatcher 参见dubbo源码包/META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Dispather,并在protocol配置时指定为自定义的dispatcher。 顺带一提,现已有的几个Dispatcher所实现的功能:AllDispather(注:Dispather不是我打错了。。类名就是这个) 转发给AllChannelHandler,默认200大小的fixed线程池,可通过threadpool配置为其他类型线程池,由IOThread转交,处理建立连接、关闭连接、处理接收请求、处理异常四种情况都会使用该线程池中的线程。ConnectionOrderedDispather 转发给ConnectionOrderedChannelHandler,建立连接和关闭连接动作采用同一个单线程的线程池处理,在线程池队列超过一定数量提供warn日志,接收请求和异常处理使用配置的线程池。DirectDispather 直接使用IOThread处理请求。ExecutionDispather 功能与AllDispatcher类似,更像是AllDispatcher的一个老版本?少了异常处理和获取executor不存在的处理。MessageOnlyDispather 只有接收请求才使用配置的线程池处理,其他使用IOThread处理。还没理顺的一个点是caught、send、receive、connection、disconnection分别是什么点进入Dispatcher。如何通过自定义Dispatcher完成分线程池处理同一服务新建自定义Dispatcher和对应的handler,可以根据需要继承AllDispatcher等,重写handleReceive,如有必要可再自定义ThreadPool。 先贴一段Dispatcher和Handler可实现方案,逻辑优化可以再考虑,比如框架不应该使用业务DTO这种,自己的线程池(保证全局唯一,这里TimeDecideChannelHandler是初始化构造的,所以成员属性也是全局唯一的)处理方案等。 增加配置文件: timeDecide=com.web.foo.dispatcher.TimeDecideDispatcher 配置dispather:
<dubbo:protocol name="dubbo" port="-1" accesslog="d:/access.log" dispather="timeDecide"></dubbo:protocol>
package com.web.foo.dispatcher;import com.alibaba.dubbo.common.URL;import com.alibaba.dubbo.remoting.ChannelHandler;import com.alibaba.dubbo.remoting.Dispather;import com.web.foo.handler.TimeDecideChannelHandler;public class TimeDecideDispatcher implements Dispather{public static final String NAME = "timeDecide";@Overridepublic ChannelHandler dispath(ChannelHandler handler, URL url){return new TimeDecideChannelHandler(handler, url);}}package com.web.foo.handler;import java.util.concurrent.ExecutorService;import com.alibaba.dubbo.common.Constants;import com.alibaba.dubbo.common.URL;import com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool;import com.alibaba.dubbo.remoting.Channel;import com.alibaba.dubbo.remoting.ChannelHandler;import com.alibaba.dubbo.remoting.ExecutionException;import com.alibaba.dubbo.remoting.RemotingException;import com.alibaba.dubbo.remoting.exchange.Request;import com.alibaba.dubbo.remoting.transport.dispather.ChannelEventRunnable;import com.alibaba.dubbo.remoting.transport.dispather.ChannelEventRunnable.ChannelState;import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler;import com.alibaba.dubbo.rpc.Invocation;import com.web.foo.dto.TestDto;/*** @Project* @Description:* @Create 2015年7月19日下午6:25:17* @author zhoushaoyu**/public class TimeDecideChannelHandler extends WrappedChannelHandler{FixedThreadPool tp = new FixedThreadPool();ExecutorService exe;/*** @param handler* @param url*/public TimeDecideChannelHandler(ChannelHandler handler, URL url){super(handler, url);url = url.addParameter(Constants.THREAD_NAME_KEY, "my-fixed-thread-pool");exe = (ExecutorService) tp.getExecutor(url);}public void connected(Channel channel) throws RemotingException{ExecutorService cexecutor = getExecutorService();try{cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t){throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}public void disconnected(Channel channel) throws RemotingException{ExecutorService cexecutor = getExecutorService();try{cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t){throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .",t);}}public void received(Channel channel, Object message) throws RemotingException{ExecutorService cexecutor = getExecutorService(message);try{cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t){throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}public void caught(Channel channel, Throwable exception) throws RemotingException{ExecutorService cexecutor = getExecutorService();try{cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t){throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}private ExecutorService getExecutorService(Object message){Object inv = message;if (message instanceof Request){inv = ((Request) message).getData();}if (!(inv instanceof Invocation)){return getExecutorService();}for (Object p : ((Invocation) inv).getArguments()){if (p instanceof String && "longCall".equalsIgnoreCase((String) p)){return (ExecutorService) exe;}if (p instanceof TestDto && "longCall".equalsIgnoreCase(((TestDto) p).getTest())){return (ExecutorService) exe;}}return getExecutorService();}private ExecutorService getExecutorService(){ExecutorService cexecutor = executor;if (cexecutor == null || cexecutor.isShutdown()){cexecutor = SHARED_EXECUTOR;}return cexecutor;}}可以看到最终的效果,如果是longCall进来就是在my-fixed-thread-pool,如果是其他的就在DubboServerHandler 
                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐