您的位置:首页 > 编程语言 > Java开发

java并发之线程池Executor知识要点 及 核心源码浅析

2016-06-16 17:05 423 查看

1.什么是线程池

定义:线程池是指管理一组同构工作线程的资源池
组成部分:

线程管理器(ThreadPool):用于创建并管理线程池。包括创建线程池,销毁线程池,添加新任务
工作线程(PoolWorker):线程池中的线程
任务接口(Task):每个任务必须实现的接口,一共工作线程调度任务的执行
任务队列:用于存放没有处理的任务,提供一种缓冲机制

2.为什么要使用线程池

通过重用现有的线程而不是创建新线程,从而减少了线程创建和销毁过程中的巨大开销
当请求到达时,工作线程已经存在,不用再等待线程的创建,从而提高了响应性
可以是处理器保持忙碌状态,提高系统的吞吐量

3.什么时候使用线程池

当一个服务器完成一项任务所需时间为:T1创建线程的时间,T2在线程中执行任务的时间,T3销毁线程时间。若T1+T3>T2时,则一般可以采用线程池。

4.如何使用线程池

线程池的类图





其中,Executor接口中只有一个execute方法
ExecutorService接口继承于Executor接口,它提供了如下方法:
shutdown();
shutdownNow();
submit();
invokeAll();
invokeAny();
[/code]

AbstractExecutorService抽象类,实现了ExecutorService中的部分方法
ThreadPoolExecutor类,实现了其上级的所有未实现的方法,还提供许多其他方法。其中,由它实现的父级方法有如下:
execute();
shutdown();
shutdownNow();
isTerminated();
[/code]

其中,需要说明的是submit与execute方法区别,submit方法是在ExecutorService中声明的方法,在AbstractExecutorService中的具体实现,而execute方法是在Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现。它们都是向线程池中提交任务,但是不同的是submit方法会返回任务的执行结果,但实际上,它还是调用的execute方法,只不过它利用了Future来获取任务的执行结果。
shutdown方法与shutdownNow方法的区别在于:shutdown方法将平缓的关闭过程,它不会再接受新的任务,通知等待已提交的任务执行完成,包括那些还未完成的任务。而shutdownNow方法,将取消所有运行中的任务,返回还未开始的任务,但不不会返回正在执行的任务

创建线程池

通过调用Execotors中的静态工厂方法

newFixedThreadPool:创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,知道达到线程池的最大数量
newCachedThreadPool:创建一个可缓存的线程池,线程池的规模不存在任何限制。有多少任务就会有多少线程。当超过了处理需求时,会回收空闲的线程
newSingleThreadExecutor:创建一个单线程的Executor,依照任务在队列中的顺序来串行执行,安全的封闭在线程中阿
newScheduledThreadPool:创建一个固定长度的线程池,而且一延迟或定时的方式来执行任务

通过ThreadPoolExecutor的构造方法进行创建
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler)
[/code]

corePoolSize:线程池的大小,当创建了线程池后,线程池中没有任何线程,而是等待任务的到来才会创建线程去执行任务。当线程数达到了corePoolSize时,就会把到达的任务放在缓存队列当中
maximumPoolSize:线程池中的最大线程数
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位
workQueue:用来存储等待执行的任务
无界队列:LinkedBlockingQueue-可无限制的增加,它是newFixedThreadPool和newSingleThreadExecutor默认的任务队列
有界队列:ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue-当队列满后,会执行饱和策略
同步移交:在线程之间移交的一种机制,当线程池是无界的或者是可以拒绝任务的就可以使用,newCachedThreadPool就使用的是SynchronousQueue
[/code]

threadFactory:线程工厂,主要用来创建线程,可以自定义线程工厂(实现ThreadFactory接口),可以定制一些行为:如统计信息以及线程被创建或者终止时把调试消息写入日志
handler:饱和策略—当任务队列被填满后,会执行的策略。有如下:
ThredPoolExecutor类中的4个静态内部类:
AbortPolicy:默认的Handler,该策略会抛出未检查的RegectedExecutionException
CallerRunsPolicy:调用者运行,将某些任务回退到调用者(一般为主线程),由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理正在执行的任务
DiscardPolicy:抛弃新提交的任务
DiscardOldestPolicy:将抛弃下一个将要执行的任务
[/code]

关于线程池的执行过程

从execute方法为入口点
/**
1.当线程池中的数目小于corePoolSize时,直接new一个Thread
2.当线程池数大于corePoolSize时,直接放入任务队列中
3.如果队列已经满了且线程池中线程数小于maximumPoolSize,则新建一个线程
*/
publicvoidexecute(Runnablecommand){
if(command==null)
thrownewNullPointerException();
intc=ctl.get();
//如果正在运行的线程数小于线程池预设的大小,就尝试addWorker(源码在其后)。若成功,直接返回。若添加失败(可能在添加过程中已达到预设的线程池的数目),重新获取线程池正在运行的线程数
if(workerCountOf(c)<corePoolSize){
if(addWorker(command,true))
return;
c=ctl.get();
}
//若任务没有线程处理(当达到了线程池的预设大小corePoolSize),就添加到任务队列。
if(isRunning(c)&&workQueue.offer(command)){
intrecheck=ctl.get();
if(!isRunning(recheck)&&remove(command))//若线程池在workeQueue.offer前发生了shutdown,就从任务队列中移除
reject(command);
elseif(workerCountOf(recheck)==0)//如果线程池在运行,并且没有可工作的线程,就直接创建一个
addWorker(null,false);
}
elseif(!addWorker(command,false))//如果任务队列已满,尝试创建一个新的Worker,若失败,说明线程池已经关闭或者饱和了
reject(command);//饱和策略问题
}
[/code]

/**
@paramfirstTask:是新线程应该运行的第一个任务,Worker会被创建在线程池的数目比corePoolSize小的时候,或者在任务队列已经满的情况下,创建一个线程来代替已死的线程
@paramcore:当为true时,用corePoolSize作为边界,否则,用maximumPoolSize作为边界
*/
privatebooleanaddWorker(RunnablefirstTask,booleancore){
retry:
for(;;){
intc=ctl.get();
intrs=runStateOf(c);//获取线程池的状态
//如果线程池被shutdown了,一般直接返回false。但是排除任务队列不为空但Workers为空的情况,在这种情况下,会调用addWorker(null,false)来创建一个线程处理队列中的任务
if(rs>=SHUTDOWN&&
!(rs==SHUTDOWN&&
firstTask==null&&
!workQueue.isEmpty()))
returnfalse;
for(;;){//如果正在执行的线程数目大于线程池中预设的线程数,返回false
intwc=workerCountOf(c);
if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
if(compareAndIncrementWorkerCount(c))
breakretry;
c=ctl.get();//Re-readctl
if(runStateOf(c)!=rs)
continueretry;
//elseCASfailedduetoworkerCountchange;retryinnerloop
}
}
booleanworkerStarted=false;
booleanworkerAdded=false;
Workerw=null;
try{
finalReentrantLockmainLock=this.mainLock;
w=newWorker(firstTask);//创建Worker:其中Thread通过调用ThreadFactory的newThread方法构建,所以在此处可以对创建的Thread进行额外的处理
finalThreadt=w.thread;
if(t!=null){
mainLock.lock();
try{
intc=ctl.get();
intrs=runStateOf(c);
//若线程池正在运行或者处于shutdown但是任务不为空,则把新建的worker添加在workers中
if(rs<SHUTDOWN||(rs==SHUTDOWN&&firstTask==null)){
if(t.isAlive())//precheckthattisstartable
thrownewIllegalThreadStateException();
workers.add(w);
ints=workers.size();//largePoolSize用于记录曾经出现过得最大的线程数
if(s>largestPoolSize)
largestPoolSize=s;
workerAdded=true;
}
}finally{
mainLock.unlock();
}
if(workerAdded){//启动任务的执行
t.start();
workerStarted=true;
}
}
}finally{
if(!workerStarted)
addWorkerFailed(w);//若启动失败,则从正在运行的工作集中移除
}
returnworkerStarted;
}
[/code]
/**
当线程池被stop或者shutdown或创建线程失败时,则会调用这个方法
1.从workers中移除worker
2.把workerCount-1
3.尝试终止操作:当线程池的状态为shutdown、线程池的数目和任务队列都为空,或者线程池已经stop、线程池数目为0
*/
privatevoidaddWorkerFailed(Workerw){
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
if(w!=null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
}finally{
mainLock.unlock();
}
}
[/code]
finalvoidtryTerminate(){
for(;;){
intc=ctl.get();
if(isRunning(c)||
runStateAtLeast(c,TIDYING)||
(runStateOf(c)==SHUTDOWN&&!workQueue.isEmpty()))
return;
if(workerCountOf(c)!=0){//Eligibletoterminate
interruptIdleWorkers(ONLY_ONE);
return;}
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){
try{
terminated();
}finally{
ctl.set(ctlOf(TERMINATED,0));
termination.signalAll();
}
return;
}
}finally{
mainLock.unlock();
}
//elseretryonfailedCAS
}
}
[/code]
个人觉得还比较重要的一个内部类是Worker,继承了AbstractQueuedSynchronizer类,其中最主要的run方法
privatefinalclassWorker
extendsAbstractQueuedSynchronizer
implementsRunnable
{
Worker(RunnablefirstTask){
setState(-1);//inhibitinterruptsuntilrunWorker
this.firstTask=firstTask;
this.thread=getThreadFactory().newThread(this);
}
publicvoidrun(){
runWorker(this);
}
protectedbooleanisHeldExclusively(){
returngetState()!=0;
}
protectedbooleantryAcquire(intunused){
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
returntrue;}
returnfalse;
}
protectedbooleantryRelease(intunused){
setExclusiveOwnerThread(null);
setState(0);
returntrue;}
//参数1就是把锁设置为了独占锁,在获取到一个任务后,准备执行前首先要获取这个锁。
publicvoidlock(){acquire(1);}
publicbooleantryLock(){returntryAcquire(1);}
publicvoidunlock(){release(1);}
publicbooleanisLocked(){returnisHeldExclusively();}
voidinterruptIfStarted(){
Threadt;
if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){
try{
t.interrupt();
}catch(SecurityExceptionignore){
}
}
}
}
[/code]
finalvoidrunWorker(Workerw){
Threadwt=Thread.currentThread();
//不从任务队列中获取第一个任务,而是执行刚提交的任务
Runnabletask=w.firstTask;
w.firstTask=null;
w.unlock();//allowinterrupts
booleancompletedAbruptly=true;
try{
while(task!=null||(task=getTask())!=null){
w.lock();
//如果线程池是STOP状态,则需要保证线程时被中断了
//ifnot,ensurethreadisnotinterrupted.This//requiresarecheckinsecondcasetodealwith//shutdownNowracewhileclearinginterrupt
if((runStateAtLeast(ctl.get(),STOP)||
(Thread.interrupted()&&
runStateAtLeast(ctl.get(),STOP)))&&
!wt.isInterrupted())
wt.interrupt();
try{
beforeExecute(wt,task);//执行前的勾子函数,可以通过重写的方式对这个函数进行扩展功能。当发生了异常时,是不会执行任务的,afterExecute也不会执行
Throwablethrown=null;
try{
task.run();//执行任务
}catch(RuntimeExceptionx){//当发生运行时异常或ERROR时,会原样抛出
thrown=x;throwx;
}catch(Errorx){
thrown=x;throwx;
}catch(Throwablex){//如果是一个Throwable,则会包装成一个Error抛出
thrown=x;thrownewError(x);
}finally{
afterExecute(task,thrown);//执行后的勾子函数,同样可以进行扩展
}
}finally{
task=null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly=false;
}finally{
processWorkerExit(w,completedAbruptly);
}
}
~
[/code]
/**
从任务队列中获取一个任务,在以下情况会退出:
1.线程数大于maximumPoolSize,我也不知道为什么会大于??创建的时候就会判断啊??
2.线程池已经停止
3.线程池shutdown并且队列也为空
4.获取一个task,超时
@return:返回一个task,或者worker退出,workerCount-1
*/
privateRunnablegetTask(){
booleantimedOut=false;//Didthelastpoll()timeout?
retry:
for(;;){
intc=ctl.get();
intrs=runStateOf(c);
//如果当前线程池已经shutdown或者stop&&任务队列为空,则workerCount-1
if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){
decrementWorkerCount();
returnnull;}
booleantimed;//Areworkerssubjecttoculling?
for(;;){
intwc=workerCountOf(c);
timed=allowCoreThreadTimeOut||wc>corePoolSize;
if(wc<=maximumPoolSize&&!(timedOut&&timed))//若没有超时
break;
if(compareAndDecrementWorkerCount(c))
returnnull;
c=ctl.get();//Re-readctl
if(runStateOf(c)!=rs)
continueretry;
//elseCASfailedduetoworkerCountchange;retryinnerloop
}
try{
Runnabler=timed?
workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):
workQueue.take();
if(r!=null)
returnr;
timedOut=true;//若没有获取到task,超时
}catch(InterruptedExceptionretry){//若被中断了,不能算作超时
timedOut=false;
}
}
}
[/code]
参考资料:
http://ifeve.com/java-threadpool/
 http://blog.163.com/among_1985/blog/static/275005232012618849266/http://developer.51cto.com/art/201203/321885.htmhttp://www.51itong.net/java-1-7-threadpoolexecutor-19428.htmlhttp://blog.csdn.net/java2000_wl/article/details/22097059http://blog.csdn.net/xieyuooo/article/details/8718741
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息