您的位置:首页 > 编程语言 > Java开发

阻塞式ThreadPoolExecutor

2015-04-22 00:00 274 查看
摘要: 最近在做一个大文件批量数据导入数据库的时候遇到个问题,在使用ThreadPoolExecutor提交任务的时候,发现在线程池满的时候,不能达到阻塞线程的作用。导致的后果就是文件被不断读取到内存,然后丢给ThreadPoolExecutor执行,由于消费速度跟不上生产速度,导致内存不断增长,最后OOM。

最近在做一个大文件批量数据导入数据库的时候遇到个问题,在使用ThreadPoolExecutor提交任务的时候,发现在线程池满的时候,不能达到阻塞线程的作用。导致的后果就是文件被不断读取到内存,然后丢给ThreadPoolExecutor执行,由于消费速度跟不上生产速度,导致内存不断增长,最后OOM。

于是开始研究ThreadPoolExecutor如何实现在任务满的情况下阻塞线程。

ThreadPoolExecutor类提供了多个参数用于定制化自己的线程池,常用的有corePoolSize,maximumPoolSize,workQueue等几个,如下面构造函数:

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @param handler the handler to use when execution is blocked
*        
3ff0
;because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
*         {@code corePoolSize < 0}<br>
*         {@code keepAliveTime < 0}<br>
*         {@code maximumPoolSize <= 0}<br>
*         {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*         or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)


为了实现阻塞式线程池,workQueue参数需要传一个有界的BlockQueue,默认Executors.newFixedThreadPool()传入的无参数LinkedBlockingQueue边界为Integer.MAX_VALUE,不能起到Block的效果。

传了有界BlockQueue之后,ThreadPoolExecutor在线程队列Blcok的时候不会阻塞线程提交,而是调用RejectedExecutionHandler,抛出RejectedExecutionException异常。

JDK默认提供了4种失败策略:
AbortPolicy(中止)、CallersRunPolicy(调用者运行)、DiscardPolicy(丢弃)、DiscardOldestPolicy(丢弃最旧的)

JDK默认使用了AbortPolicy(中止)策略,这个可以通过handler参数来设置。

这里收集了几种阻塞线程池提交的方法:

一、通过CallersRunPolicy调用策略实现

其中CallersRunPolicy(调用者运行)方法,在线程池队列满了后会调用主线程来执行任务,同样可以达到阻塞线程提交的目的。这样做有两个缺点:

1、执行任务的线程会是size+1个(主线程),这在有些资源敏感的场景是不被允许的

2、由于主线程被用于执行任务,如果这个任务比较大,会长时间阻塞主线程的执行,导致其他线程空闲时候也不能接受新的任务,形成资源浪费

实例代码如下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy())


二、通过自定义RejectedExecutionHandler实现

通过自定义RejectedExecutionHandler,显示调用queue.put()阻塞方法来实现线程池阻塞。这种方法能够避免CallersRunPolicy方法的两个缺点。

示例代码如下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});


三、通过其他多线程工具辅助控制

比如常用的可以通过信号量来控制,在提交任务的时候acquire,任务执行完后release。

这种方法的缺点是会侵入任务的执行过程

示例代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
final Semaphore semaphore = new Semaphore(5);
final AtomicInteger counter = new AtomicInteger(0);
int i = 0;
while (true) {
semaphore.acquire();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
int count = counter.addAndGet(1);
System.out.println(Thread.currentThread() + "start, counter: " + count);
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "end, counter: " + count);
} finally {
semaphore.release();
}
}
});
if (++i > 20) {
break;
}
System.out.println("now it is " + i);
}

System.out.println("shotdown...");
ExecutorUtils.shutdownAndWait(executorService, executorService.toString());
System.out.println("Test ends.");
}


这里注意线程池的shutdown过程,没有使用简单的shutdown,因为这样会导致部分task没有执行完成

ExecutorUtils.shutdownAndWait方法代码如下:

public static void shutdownAndWait(ExecutorService executor, String name) {
log.info("Shutting down " + name);
executor.shutdown();
awaitTermination(executor, name);
}
private static void awaitTermination(ExecutorService executor, String name) {
try {
while (!executor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT, TimeUnit.SECONDS)) {
log.info("Waiting for all tasks complete execution in " + name);
}
log.info(name + " is shut down.");
} catch (InterruptedException e) {
log.error("Shutting down " + name + " failed.", e);
Thread.currentThread().interrupt();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息