您的位置:首页 > 编程语言 > Java开发

Java 线程池源码

2016-05-13 11:58 423 查看

ThreadPoolExecutor参数

corePoolSize

核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。

核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。

maxPoolSize

当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

keepAliveTime

当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

allowCoreThreadTimeout

是否允许核心线程空闲退出,默认值为false。

queueCapacity

任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorDemo {
public static void main(String[] args) {
ExecutorService executors = Executors.newCachedThreadPool();
executors.execute(new Runnable() {

@Override
public void run() {
System.out.println("hello");
}

});
}
}


线程池添加线程:

当线程数小于核心线程数时,创建线程。

当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

当线程数大于等于核心线程数,且任务队列已满

若线程数小于最大线程数,创建线程

若线程数等于最大线程数,抛出异常,拒绝任务

Executor

public interface Executor {
void execute(Runnable command);
}


ExecutorService

public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


Executors

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


SynchronousQueue

/**
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
* 不公平锁
*/
public SynchronousQueue() {
this(false);
}


ThreadPoolExecutor

ctl:是一个原子整数包装两个概念字段workerCount、指示的有效数量的线程runState,指示是否运行,关闭等。

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//能否创建核心线程
if (workerCountOf(c) < corePoolSize) {
//创建核心线程并且开启线程任务
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//任务队列还未满,进入任务等待队列
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//任务队列已满,创建普通用户线程
else if (!addWorker(command, false))
reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//是否使用核心线程,还是普通的用户线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas失败重试:将ctl的标志重置
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//线程池的主锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入hashset数组中
workers.add(w);
int s = workers.size();
//重新计算大小
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//添加失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
//修改ctl
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}


execute执行时:

1.调用workCountOf()方法获取当前线程池中线程数量,判断当前线程数量是否超过了核心线程数,若未超过,则直接添加一个核心线程,并用此线程完成当前提交的任务。

2.若当前线程数已经超过核心线程数且ctl状态等于RUNNING且工作成功进入工作等待队列,则我们进一步复查ctl,若ctl状态依旧为RUNNING,且调用workCountOf()方法检查发现此时的工作线程数为0时,将添加一个工作线程;若此时已经不为RUNNING,则尝试移除任务,并调用拒绝任务方法:reject(command)。(这里之所以需要复查ctl状态是由于在执行workQueue.offer(command)方法时,ctl状态随时可能由于调用shutDown方法或者shutDownNow方法而发生变化)。

3.如果上述两种情况都不吻合,即此时已经有超过核心线程数的的线程在工作,且任务队列也已堆满,则尝试增加一个工作线程(如果此时线程数达到限定最大线程数,则会失败),若失败则调用拒绝任务方法reject(command).

在execute的方法中添加工作线程的所调用的法为addWorker(Runnable runnable,Boolean core).该方法接受两个参数,runnable对象为这个新建线程的第一个工作任务,可以为空。core指代新建的任务是否是核心线程。


参考

1  当前所说的计算机 “有几个CPU”准确的说法是“CPU有几个核心”,任务管理器里面看到的是CPU的线程。即CPU物理上的个数为一个,但是有多个核心。
2   双核处理器即是基于单个半导体的一个处理器上拥有两个一样功能的处理器核心。换句话说,将两个物理处理器核心整合入一个核中。
3  核心与线程的关系:
核相当于公路  双核是2条3米的路   4核是4条3米的路
双核四线程是两条路中间又划了2根线分成4条1.5的路 当然 对于1.5米的小车来说  一次并排可以跑4辆(原来不划分3米的2条路也只能开2个小车) 但是3米的大车也只是跑2辆
但是4核就可以跑4辆大车
就是说4线程在处理小的并行数据时候多线程起作用  大的并行数据它的处理能力比不上真的4核
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: