您的位置:首页 > 编程语言 > Java开发

Core Java Tutorial -- Thread Pool

2018-04-01 07:42 316 查看
Java 线程池管理工作线程池,它包含一个让任务等待执行的队列。我们可以使用 ThreadPoolExecutor 在 Java 中创建线程池。

Java 线程池管理 Runnable 线程集合,并且工作相称从队列中执行 Runnable。
java.util.concurrent.Executors
提供
java.util.concurrent.Executor
接口的实现来在 Java 中创建线程池。

首先我们需要一个 Runnable 类,名为
WorkerThread.java


package Thread;

public class WorkerThread implements Runnable {
private String command;

public WorkerThread(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Command = " + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " End.");
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}


ExecutorService Example

这里是测试程序类 SimpleThreadPool.java,我们从Executors 框架创建固定线程池。

package Thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executorService.execute(worker);
}
executorService.shutdown();
while (!executorService.isTerminated()) {

}
System.out.println("Finish all threads");
}
}


上述程序中,我们创建了 5 个工作线程的固定大小的线程池。然后我们向这个池提交 10 个 worker,因为池大小为 5,它将开始工作 5 个工作,其他工作将处于等待状态,只要一个工作完成,等待队列中的另一个工作将会被工作者线程拾起并执行。

pool-1-thread-2 Start. Command = 1
pool-1-thread-1 Start. Command = 0
pool-1-thread-4 Start. Command = 3
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-5 End.
pool-1-thread-5 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 6
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 7
pool-1-thread-4 End.
pool-1-thread-4 Start. Command = 8
pool-1-thread-1 End.
pool-1-thread-1 Start. Command = 9
pool-1-thread-2 End.
pool-1-thread-5 End.
pool-1-thread-3 End.
pool-1-thread-1 End.
pool-1-thread-4 End.
Finish all threads


输出确认从“pool-1-thread-1”到“pool-1-thread-5”的池中有五个线程,它们负责将提交的任务执行到池中。

ThreadPoolExecutor Example

Executors 类使用 ThreadPoolExecutor 提供简单的 ExecutorService 实现,但是 ThreadPoolExecutor 提供了笔者更多的功能。我们可以指定创建 ThreadPoolExecutor 实例时将存活的线程数量,并且可以限制线程池的大小并创建自己的 RejectedExecutionHandler 实现来处理不适合工作队列的作业。

这是我们的 RejectedExecutionHandler 接口的自定义实现。

package Thread;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}

}


ThreadPoolExecutor 提供了几种方法,同故宫这些方法我们可以找出执行程序的当前状态,池大小,活动线程数和任务计数。所以我有一个监视器线程,它会在特定的时间间隔打印执行程序信息。

package Thread;

import
4000
java.util.concurrent.ThreadPoolExecutor;

public class MyMonitorThread implements Runnable {
private ThreadPoolExecutor executor;
private int seconds;
private boolean run = true;

public MyMonitorThread(ThreadPoolExecutor executor, int delay) {
this.executor = executor;
this.seconds = delay;
}

public void shutdown() {
this.run = false;
}

@Override
public void run() {
while (run) {
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, " +
"isTerminated: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}


使用 ThreadPoolExecutor 的线程池实现

package Thread;

import java.util.concurrent.*;

public class WorkerPool {
public static void main(String args[]) throws InterruptedException {
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
for (int i = 0; i < 10; i++) {
executorPool.execute(new WorkerThread("cmd" + i));
}

Thread.sleep(30000);
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
Thread.sleep(5000);
monitor.shutdown();

}
}


请注意,在初始化ThreadPoolExecutor时,我们将初始池大小保持为2,最大池大小为4,工作队列大小为2。因此,如果有4个正在运行的任务并提交了更多任务,则工作队列将只保留其中的2个,其余的部分将由RejectedExecutionHandlerImpl 处理。

输出:

[monitor] [0/2] Active: 0, Completed: 0, Task: 0, isShutdown: false, isTerminated: false
cmd6 is rejected
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
pool-1-thread-2 Start. Command = cmd1
pool-1-thread-1 Start. Command = cmd0
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-4 Start. Command = cmd5
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = cmd2
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 End.
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-2 End.
pool-1-thread-1 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true


注意执行程序的活动,已完成和总完成任务计数的变化。 我们可以调用 shutdown() 方法来完成所有提交的任务的执行并终止线程池。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: