hystrix线程池隔离源码初识
2018-03-06 14:54
323 查看
hystrix源码初识之线程池隔离
本篇博客只讲述如下内容:hystrixCommand的初始化过程
hystrix的线程池部分的初始化
一个DubboHystrixCommand构造过程如下
public DubboHystrixCommand(Invoker invoker, Invocation invocation) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())) .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length))) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(300)// 10秒钟内至少19此请求失败,熔断器才发挥起作用 .withCircuitBreakerSleepWindowInMilliseconds(1000)// 熔断器中断请求1秒后会进入半打开状态,放部分流量过去重试 .withCircuitBreakerErrorThresholdPercentage(50)// 错误率达到50开启熔断保护 .withExecutionTimeoutEnabled(false))// 使用dubbo的超时,禁用这里的超时 .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl())) .withMaxQueueSize(1000).withQueueSizeRejectionThreshold(700)));// 线程池为30 this.invoker = invoker; this.invocation = invocation; }
setter为HystrixCommand的一个静态内部类:
实际调用的构造方法为:
/** * Construct a {@link HystrixCommand} with defined {@link Setter} that allows injecting property and strategy overrides and other optional arguments. * <p> * NOTE: The {@link HystrixCommandKey} is used to associate a {@link HystrixCommand} with {@link HystrixCircuitBreaker}, {@link HystrixCommandMetrics} and other objects. * <p> * Do not create multiple {@link HystrixCommand} implementations with the same {@link HystrixCommandKey} but different injected default properties as the first instantiated will win. * <p> * Properties passed in via {@link Setter#andCommandPropertiesDefaults} or {@link Setter#andThreadPoolPropertiesDefaults} are cached for the given {@link HystrixCommandKey} for the life of the JVM * or until {@link Hystrix#reset()} is called. Dynamic properties allow runtime changes. Read more on the <a href="https://github.com/Netflix/Hystrix/wiki/Configuration">Hystrix Wiki</a>. * * @param setter * Fluent interface for constructor arguments */ protected HystrixCommand(Setter setter) { // use 'null' to specify use the default this(setter.groupKey, setter.commandKey, setter.threadPoolKey, null, null, setter.commandPropertiesDefaults, setter.threadPoolPropertiesDefaults, null, null, null, null, null); }
最终调用的是下面这个重载的构造方法:
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); //初始化commandKey this.commandKey = initCommandKey(key, getClass()); //初始化command时的一些配置 this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); //线程池的key this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); //度量,里面包含着进行隔离的策略和算法 this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); //熔断器初始化 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); //线程池 this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins(一些拓展的插件策略) this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); //并发的策略 this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); //用于拓展用的hook,是一个回调的钩子 this.executionHook = in itExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ //信号量隔离时的配置 时间(默认100s.executionSemaphoreQueueO}
超时时间(默认1000ms,单位:ms)
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds
在调用方配置,被该调用方的所有方法的超时时间都是该值,优先级低于下边的指定配置
hystrix.command.HystrixCommandKey.execution.isolation.thread.timeoutInMilliseconds
在调用方配置,被该调用方的指定方法(HystrixCommandKey方法名)的超时时间是该值
线程池核心线程数
hystrix.threadpool.default.coreSize(默认为10)
Queue
hystrix.threadpool.default.maxQueueSize(最大排队长度。默认-1,使用SynchronousQueue。其他值则使用 LinkedBlockingQueue。如果要从-1换成其他值则需重启,即该值不能动态调整,若要动态调整,需要使用到下边这个配置)
hystrix.threadpool.default.queueSizeRejectionThreshold(排队线程数量阈值,默认为5,达到时拒绝,如果配置了该选项,队列的大小是该队列)
注意:如果maxQueueSize=-1的话,则该选项不起作用
断路器
hystrix.command.default.circuitBreaker.requestVolumeThreshold(当在配置时间窗口内达到此数量的失败后,进行短路。默认20个)
For example, if the value is 20, then if only 19 requests are received in the rolling window (say a window of 10 seconds) the circuit will not trip open even if all 19 failed.
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds(短路多久以后开始尝试是否恢复,默认5s)
hystrix.command.default.circuitBreaker.errorThresholdPercentage(出错百分比阈值,当达到此阈值后,开始短路。默认50%)
fallback
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests(调用线程允许请求HystrixCommand.GetFallback()的最大数量,默认10。超出时将-THREAD隔离模式也起作用)
进入initThreadPool方法:
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { // get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }
然后进入HystrixThreadPool的getInstance方法:
/** * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}. * <p> * This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}. * * @return {@link HystrixThreadPool} instance */ /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time //会先从map中去取,相当于这里做了个缓存 HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { //如果该threadPoolKey对应的hystrixThreadPool没有初始化过,则初始化 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
下面再看一看HystrixThreadPoolDefault的构造方法:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); //会根据传入的queueSize获取到对应的queue this.queue = concurrencyStrategy.getBlockingQueue(queueSize); if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) { this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); this.threadPool = this.metrics.getThreadPool(); } else { this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, //会根据传入的coreSize和其他属性初始化ThreadPoolExecutor concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); this.threadPool = this.metrics.getThreadPool(); } /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }
this.queue = concurrencyStrategy.getBlockingQueue(queueSize); //对应的方法为: public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { /* * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted). * <p> * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want. * <p> * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues * and rejecting is the preferred solution. */ if (maxQueueSize <= 0) { //同步队列 return new SynchronousQueue<Runnable>(); } else { //根据maxQueueSize得到的一个LinkedBlockingQueue return new LinkedBlockingQueue<Runnable>(maxQueueSize); } }
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); //上面的方法主要是用于初始化线程池的 /** * Factory method to provide {@link ThreadPoolExecutor} instances as desired. * <p> * Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize}, * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods. * <p> * <b>Default Implementation</b> * <p> * Implementation using standard java.util.concurrent.ThreadPoolExecutor * * @param threadPoolKey * {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for. * @param corePoolSize * Core number of threads requested via properties (or system default if no properties set). * @param maximumPoolSize * Max number of threads requested via properties (or system default if no properties set). * @param keepAliveTime * Keep-alive time for threads requested via properties (or system default if no properties set). * @param unit * {@link TimeUnit} corresponding with keepAliveTime * @param workQueue * {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)} * @return instance of {@link ThreadPoolExecutor} */ public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { //构建threadFactory threadFactory = new ThreadFactory() { protected final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); //比较下传入的线程core数量的动态值与最大值 if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); //生成线程池 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } }
相关文章推荐
- Hystrix 服务的隔离策略对比,信号量与线程池隔离的差异 3ff0
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- 【Hystrix权威指南三】Hystrix隔离策略源码分析一
- 【Hystrix权威指南四】Hystrix隔离策略源码分析二
- Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- Hystrix入门与分析(二):依赖隔离之线程池隔离
- 二、Hystrix隔离服务的实现原理---线程池隔离
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- Hystrix 源码解析 —— 执行结果缓存
- spring boot hystrix 服务降级源码
- 初识 Java 线程池
- 一步一步写web之初识web(五-简单源码分析篇)
- 熔断器 Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑
- java线程池源码解析
- 基于线程池的工作原理与源码解读
- [Java多线程]-线程池的基本使用和部分源码解析(创建,执行原理)
- ACE - ACE_Task源码剖析及线程池实现
- SpringCloud(第 015 篇)电影Ribbon微服务集成Hystrix增加隔离策略控制线程数或请求数来达到熔断降级的作用
- 通过线程池源码分析其运行机制
- 熔断器 Hystrix 源码解析 —— 执行结果缓存