您的位置:首页 > 其它

hystrix线程池隔离源码初识

2018-03-06 14:54 323 查看

hystrix源码初识之线程池隔离

本篇博客只讲述如下内容:

hystrixCommand的初始化过程

hystrix的线程池部分的初始化

一个DubboHystrixCommand构造过程如下

public DubboHystrixCommand(Invoker invoker, Invocation invocation) {

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
.andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(300)// 10秒钟内至少19此请求失败,熔断器才发挥起作用
.withCircuitBreakerSleepWindowInMilliseconds(1000)// 熔断器中断请求1秒后会进入半打开状态,放部分流量过去重试
.withCircuitBreakerErrorThresholdPercentage(50)// 错误率达到50开启熔断保护
.withExecutionTimeoutEnabled(false))// 使用dubbo的超时,禁用这里的超时
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))
.withMaxQueueSize(1000).withQueueSizeRejectionThreshold(700)));// 线程池为30
this.invoker = invoker;
this.invocation = invocation;
}


setter为HystrixCommand的一个静态内部类:


实际调用的构造方法为:

/**
* Construct a {@link HystrixCommand} with defined {@link Setter} that allows injecting property and strategy overrides and other optional arguments.
* <p>
* NOTE: The {@link HystrixCommandKey} is used to associate a {@link HystrixCommand} with {@link HystrixCircuitBreaker}, {@link HystrixCommandMetrics} and other objects.
* <p>
* Do not create multiple {@link HystrixCommand} implementations with the same {@link HystrixCommandKey} but different injected default properties as the first instantiated will win.
* <p>
* Properties passed in via {@link Setter#andCommandPropertiesDefaults} or {@link Setter#andThreadPoolPropertiesDefaults} are cached for the given {@link HystrixCommandKey} for the life of the JVM
* or until {@link Hystrix#reset()} is called. Dynamic properties allow runtime changes. Read more on the <a href="https://github.com/Netflix/Hystrix/wiki/Configuration">Hystrix Wiki</a>.
*
* @param setter
*            Fluent interface for constructor arguments
*/
protected HystrixCommand(Setter setter) {
// use 'null' to specify use the default
this(setter.groupKey, setter.commandKey, setter.threadPoolKey, null, null, setter.commandPropertiesDefaults, setter.threadPoolPropertiesDefaults, null, null, null, null, null);
}


最终调用的是下面这个重载的构造方法:

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

this.commandGroup = initGroupKey(group);
//初始化commandKey
this.commandKey = initCommandKey(key, getClass());
//初始化command时的一些配置
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//线程池的key
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
//度量,里面包含着进行隔离的策略和算法
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//熔断器初始化
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//线程池
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins(一些拓展的插件策略)
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
//并发的策略
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
//用于拓展用的hook,是一个回调的钩子
this.executionHook = in itExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;

/* execution semaphore override if applicable */
//信号量隔离时的配置
时间(默认100s.executionSemaphoreQueueO}


超时时间(默认1000ms,单位:ms)

hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds

在调用方配置,被该调用方的所有方法的超时时间都是该值,优先级低于下边的指定配置

hystrix.command.HystrixCommandKey.execution.isolation.thread.timeoutInMilliseconds

在调用方配置,被该调用方的指定方法(HystrixCommandKey方法名)的超时时间是该值

线程池核心线程数

hystrix.threadpool.default.coreSize(默认为10)

Queue

hystrix.threadpool.default.maxQueueSize(最大排队长度。默认-1,使用SynchronousQueue。其他值则使用 LinkedBlockingQueue。如果要从-1换成其他值则需重启,即该值不能动态调整,若要动态调整,需要使用到下边这个配置)

hystrix.threadpool.default.queueSizeRejectionThreshold(排队线程数量阈值,默认为5,达到时拒绝,如果配置了该选项,队列的大小是该队列)

注意:如果maxQueueSize=-1的话,则该选项不起作用

断路器

hystrix.command.default.circuitBreaker.requestVolumeThreshold(当在配置时间窗口内达到此数量的失败后,进行短路。默认20个)

For example, if the value is 20, then if only 19 requests are received in the rolling window (say a window of 10 seconds) the circuit will not trip open even if all 19 failed.

hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds(短路多久以后开始尝试是否恢复,默认5s)

hystrix.command.default.circuitBreaker.errorThresholdPercentage(出错百分比阈值,当达到此阈值后,开始短路。默认50%)

fallback

hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests(调用线程允许请求HystrixCommand.GetFallback()的最大数量,默认10。超出时将-THREAD隔离模式也起作用)

进入initThreadPool方法:

private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}


然后进入HystrixThreadPool的getInstance方法:

/**
* Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}.
* <p>
* This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}.
*
* @return {@link HystrixThreadPool} instance
*/
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();

// this should find it for all but the first time
//会先从map中去取,相当于这里做了个缓存
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}

// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
//如果该threadPoolKey对应的hystrixThreadPool没有初始化过,则初始化
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}


下面再看一看HystrixThreadPoolDefault的构造方法:

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
//会根据传入的queueSize获取到对应的queue
this.queue = concurrencyStrategy.getBlockingQueue(queueSize);

if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) {
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = this.metrics.getThreadPool();
} else {
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
//会根据传入的coreSize和其他属性初始化ThreadPoolExecutor
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = this.metrics.getThreadPool();
}

/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}


this.queue = concurrencyStrategy.getBlockingQueue(queueSize);

//对应的方法为:
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
/*
* We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
* <p>
* SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
* <p>
* Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
* and rejecting is the preferred solution.
*/
if (maxQueueSize <= 0) {
//同步队列
return new SynchronousQueue<Runnable>();
} else {
//根据maxQueueSize得到的一个LinkedBlockingQueue
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}


concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);

//上面的方法主要是用于初始化线程池的
/**
* Factory method to provide {@link ThreadPoolExecutor} instances as desired.
* <p>
* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
* {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation using standard java.util.concurrent.ThreadPoolExecutor
*
* @param threadPoolKey
*            {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
* @param corePoolSize
*            Core number of threads requested via properties (or system default if no properties set).
* @param maximumPoolSize
*            Max number of threads requested via properties (or system default if no properties set).
* @param keepAliveTime
*            Keep-alive time for threads requested via properties (or system default if no properties set).
* @param unit
*            {@link TimeUnit} corresponding with keepAliveTime
* @param workQueue
*            {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
* @return instance of {@link ThreadPoolExecutor}
*/
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
//构建threadFactory
threadFactory = new ThreadFactory() {
protected final AtomicInteger threadNumber = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}

};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}

final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();
//比较下传入的线程core数量的动态值与最大值
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
//生成线程池
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: