您的位置:首页 > 编程语言 > Java开发

自定义Java线程池 ThreadPoolExecutor

2015-05-18 15:35 399 查看
在使用线程池的时候,我们可以使用JDK提供的缓存线程池,这也是经常用到的,如下:

ExecutorService executorService = Executors.newCachedThreadPool();
也可以使用固定线程池,如下:

ExecutorService executorService = Executors.newFixedThreadPool(count);

有时候,我们需要自定义适合自己需要的线程池:ThreadPoolExecutor , 代码如下:

自定义线程池

package com.test1.threads;

import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* ThreadPoolExecutor
当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务
* */

public class MyExecutorThreadPool {

static void log(String msg) {
System.out.println(System.currentTimeMillis() + " -> " + msg);
}

static int getThreadRunState(ThreadPoolExecutor pool)throws Exception {
// Field field = ThreadPoolExecutor.class.getDeclaredField("runState");
// field.setAccessible(true);
// int s = field.getInt(pool);
return 1;
}

public static void main(String[] args) throws Exception {
//阻塞队列的固定长度为1
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1);

/*
DiscardPolicy 表示直接放弃,也不抛出异常。打印如下:
1431932733129 -> before sleep
1431932733129 -> run task: 0 -> pool-1-thread-1
1431932734129 -> run over: 0 -> pool-1-thread-1
1431932734129 -> run task: 1 -> pool-1-thread-1
1431932735129 -> run over: 1 -> pool-1-thread-1
1431932737129 -> before shutdown()
1431932737130 -> after shutdown(),pool.isTerminated=true
1431932737130 -> now,pool.isTerminated=true, state=1

* */
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();

/*
AbortPolicy 表示当有多余的任务时,直接放弃且抛出异常。打印如下:
Exception in thread "main" 1431932635036 -> run task: 0 -> pool-1-thread-1
java.util.concurrent.RejectedExecutionException: Task com.test1.threads.TaskRunnable@42ff665a rejected from java.util.concurrent.ThreadPoolExecutor@27abcd5e[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at com.test1.threads.MyExecutorThreadPool.main(MyExecutorThreadPool.java:129)
1431932636036 -> run over: 0 -> pool-1-thread-1
1431932636036 -> run task: 1 -> pool-1-thread-1
1431932637036 -> run over: 1 -> pool-1-thread-1
* */
// handler = new ThreadPoolExecutor.AbortPolicy();

/*
DiscardOldestPolicy 表示抛弃最旧的任务,从task1到task8都被抛弃,最后加入的task9会被执行。打印如下:
1431931268334 -> before sleep
1431931268334 -> run task: 0 -> pool-1-thread-1
1431931269334 -> run over: 0 -> pool-1-thread-1
1431931269334 -> run task: 9 -> pool-1-thread-1
1431931270334 -> run over: 9 -> pool-1-thread-1
1431931272335 -> before shutdown()
1431931272336 -> after shutdown(),pool.isTerminated=false
1431931272336 -> now,pool.isTerminated=true, state=1

*/
// handler = new ThreadPoolExecutor.DiscardOldestPolicy();

/*
* CallerRunsPolicy 表示,剩下的任务由当前子线程的调用者线程去处理。
* 在本例中, 线程池中的线程的调用者为主线程,也就是main函数。如果另外新建一个线程,并用新建的
线程来启动线程池中的线程,则线程池中的线程的调用者为新建的那个线程。如下:
new Thread(new Runnable() {
@Overried
public void run() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MINUTES, queue);
.....
for(.....) {

}
}
}).start();

// 打印如下
1431930356817 -> run task: 2 -> Thread-0
1431930356817 -> run task: 0 -> pool-1-thread-1
1431930357818 -> run over: 0 -> pool-1-thread-1
1431930357818 -> run over: 2 -> Thread-0
1431930357818 -> run task: 3 -> Thread-0
1431930357818 -> run task: 1 -> pool-1-thread-1
1431930358818 -> run over: 3 -> Thread-0
1431930358818 -> run over: 1 -> pool-1-thread-1
1431930358819 -> run task: 5 -> Thread-0
1431930358819 -> run task: 4 -> pool-1-thread-1
1431930359819 -> run over: 4 -> pool-1-thread-1
1431930359819 -> run over: 5 -> Thread-0
1431930359819 -> run task: 7 -> Thread-0
1431930359819 -> run task: 6 -> pool-1-thread-1
1431930360820 -> run over: 6 -> pool-1-thread-1
1431930360820 -> run over: 7 -> Thread-0
1431930360820 -> run task: 9 -> Thread-0
1431930360820 -> run task: 8 -> pool-1-thread-1
1431930361820 -> run over: 8 -> pool-1-thread-1
1431930361820 -> run over: 9 -> Thread-0

当调用者是main线程的时候,打印如下:
1431931124052 -> run task: 2 -> main
1431931124052 -> run task: 0 -> pool-1-thread-1
1431931125052 -> run over: 2 -> main
1431931125052 -> run task: 3 -> main
1431931125052 -> run over: 0 -> pool-1-thread-1
1431931125052 -> run task: 1 -> pool-1-thread-1
1431931126052 -> run over: 1 -> pool-1-thread-1
1431931126052 -> run over: 3 -> main
1431931126053 -> run task: 4 -> pool-1-thread-1
1431931126053 -> run task: 5 -> main
1431931127053 -> run over: 5 -> main
1431931127053 -> run over: 4 -> pool-1-thread-1
1431931127053 -> run task: 6 -> pool-1-thread-1
1431931127053 -> run task: 8 -> main
1431931128053 -> run over: 6 -> pool-1-thread-1
1431931128053 -> run task: 7 -> pool-1-thread-1
1431931128053 -> run over: 8 -> main
1431931128053 -> before sleep
1431931129053 -> run over: 7 -> pool-1-thread-1
1431931129053 -> run task: 9 -> pool-1-thread-1
*/
// handler = new ThreadPoolExecutor.CallerRunsPolicy();

/*
* //自定义的固定长度的线程池,线程池中的线程总数为1
pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
参数解释如下:
corePoolSize: 如果池中的实际线程数小于corePoolSize,无论是否其中有空闲的线程,
都会给新的任务产生新的 线程;
如果池中的线程数>corePoolSize and <maximumPoolSize,而又有空闲线程,就给新任务使用
空闲线程,如没有空闲线程,则产生新线程。
maximumPoolSize:线程池中的最大线程数。
如果池中的线程数=maximumPoolSize,则有空闲线程使用空闲线程,否则新任务放入workQueue。
(线程的空闲只有在workQueue中不再有任务时才成立);
当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。
如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
keepAliveTime:当线程池中的线程数大于corePoolSize的时候,那么空闲线程在等待keepAliveTime后,如果还没有新任务
到来,那么多余的空闲线程将被终止,直到线程数等于corePoolSize,此时allowCoreThreadTimeout=false;
unit:时间单元,可以指定时间的单位。
workQueue:任务缓存队列。见最上面的描述。

另外:当corePoolSize = maximumPoolSize 的时候,就是个固定线程池。可以通过查看源码看看FixedPool等。

* */
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MINUTES, queue);

pool.setRejectedExecutionHandler(handler);
for(int i=0; i < 10; i++) {
final int c = i;
pool.execute(new TaskRunnable(c));
}
log("before sleep");
Thread.sleep(4000L);
log("before shutdown()");
pool.shutdown();
log("after shutdown(),pool.isTerminated=" + pool.isTerminated());
pool.awaitTermination(1000L, TimeUnit.SECONDS);
log("now,pool.isTerminated=" + pool.isTerminated() + ", state=" + getThreadRunState(pool));
}

}


// 任务类,实现Runnable接口。
package com.test1.threads;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskRunnable implements Runnable {

private final int index;
protected TaskRunnable(int index) {
this.index = index;
}

static void log(String msg) {
System.out.println(System.currentTimeMillis() + " -> " + msg);
}

@Override
public void run() {
log("run task: " + index + " -> " + Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
log("run over: " + index + " -> " + Thread.currentThread().getName());
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: