您的位置:首页 > 其它

Dubbo线程池耗尽异常原理分析RejectedExecutionException:Thread pool is EXHAUSTED

2017-06-21 00:00 405 查看
对最近遇到的业务应用dubbo线程池爆满(异常:RejectedExecutionException:Thread pool is EXHAUSTED)问题进行了分析。

一、问题回顾:

业务应用dubbo配置如下:

<dubbo:protocol name="dubbo" port="${dubbo.port}" /}

在dubbo的spring配置中,业务应用并没有配置threadpool, threads等参数,查看dubbo.io用户文档,可知:默认配置为:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

查看dubbo代码(版本为2.5.3)实现:FixedThreadPool.java

/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.threadpool.support.fixed;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

/**
* 此线程池启动时即创建固定大小的线程数,不做任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code>
*
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
* @author william.liangf
*/
public class FixedThreadPool implements ThreadPool {

public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

}

其中:

public class Constants {

// the other constant
// ......

public static final String  DEFAULT_THREAD_NAME = "Dubbo";
public static final int     DEFAULT_THREADS = 200;
public static final int     DEFAULT_QUEUES = 0;
}

由代码可知:dubbo的线程池采用jdk的ThreadPoolExecutor,默认threads数为200,默认队列长度为0,此时默认采用了SynchronousQueue队列,而如果用户配置的队列长度大于0时,则会采用LinkedBlockingQ
7fe8
ueue队列。【注意:由此可见,dubbo.io用户文档中,默认threads=100是错误的,实际为200】

SynchronousQueue:是一个缓存为1的阻塞队列,某次添加元素后必须等待其他线程取走后才能继续添加。

LinkedBlockingQueue:线程安全的基于链表的阻塞队列,实现了入队出队的分离(分别采用putLock和takeLock锁,因此可以同时入队和出队操作)位于java.util.concurrent包下,是一个无界队列。

ThreadPoolExecutor的完整构造方法的签名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize-池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
ThreadPoolExecutor是Executors类的底层实现。

dubbo默认创建固定大小的线程池(200), 每次提交一个任务就创建一个线程,直到线程数达到线程池大小200,线程池的大小一旦达到最大值就保持不变。如果某个线程因为执行异常而结束,那么线程池会补充一个新的线程。

由于dubbo默认采用了直接提交的SynchronousQueue工作队列,所以,所有的task会直接提交给线程池中的某一worker线程,如果没有可用线程,那么会拒绝任务的处理,而抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.

/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.threadpool.support;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;

/**
* Abort Policy.
* Log warn info when abort.
*
* @author ding.lid
*/
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

private final String threadName;

private final URL url;

public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
throw new RejectedExecutionException(msg);
}

}

因此,如果系统抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.可以通过调大dubbo线程池解决该问题,或者降低对TPS的预期。

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />

此处修改了dubbo线程池为500,以处理更多的任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐