您的位置:首页 > 编程语言 > Java开发

Java中的线程池(1)----线程池基础知识和CachedThreadPool

2017-12-06 23:32 495 查看
本文探讨一下java中的线程池

首先,什么是线程池?

线程池通过多个任务重用线程,线程创建的开销就被分摊到了多个任务上,而且请求到达时线程已经存在,消除了等待线程创建带来的延迟,使得程序响应更快。

线程池应该至少包括

1、线程池管理器:创建、销毁管理线程池,将工作线程放入线程池中。

2、工作线程:循环执行任务的线程,在没有任务时进行等待。

3、任务队列:缓冲机制,将没有处理的任务放入任务队列中。

4、任务接口:规定任务的入口、任务执行完的收尾工作、任务执行状态等,任务调度算法应该写在这里。

线程池的优点是什么?

线程池主要用来解决线程生命周期开销问题和资源不足问题。设想每当一个请求到达就创建一个新线程,开销是挺大的,甚至在创建和销毁线程上花的时间和消耗的资源要大于处理用户请求的时间和资源。另外如果创建线程太多,可能导致系统由于过度消耗内存和切换过度导致系统资源不足,因而可以通过线程池尽可能减少创建和销毁线程的次数,利用已有的对象进行服务。

线程池存在的问题是什么?

线程池的使用也是存在风险的,比如一样存在和其他多线程程序存在的并发风险,如同步错误、死锁,还有线程池特有的风险,如资源不足、线程泄露。

线程泄露

其他的问题我的其他文章都说过了,就只说一下线程泄露。

当线程池中除去一个线程去执行一项任务的时候,任务完成之后线程却没有返回线程池,就会发生线程泄露。比如在执行任务时抛出RuntimeException或者一个Error,如果没有catch到,线程池线程数量永久减1。当这样的情况发生次数足够多时,线程池就没有线程来处理任务了。

Java中ThreadPoolExecutor类

JDK中提供了java.util.concurrent.ThreadPoolExecutor类,以此来实现线程池:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
这个类提供了4个构造方法,实际上前3个构造方法都是调用第4个构造方法进行初始化。

介绍一下构造方法中的每个参数:

1、corePoolSize:核心池的大小。默认情况下创建线程池之后线程池没有线程,而是等待任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,进行线程预创
e734
建,创建corePoolSize个线程。

2、maximumPoolSize:线程池最大线程个数,表示线程池中最多能创建的线程个数。

3、keepAliveTime:线程多久没有任务执行就会终止默认情况下,当线程数目大于corePoolSize的时候,keepAliveTime才会起作用,直到线程池中线程数目不大于corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)之后,线程池中的线程个数不大于corePoolSize的时候,keepAliveTime参数也会起作用,直到线程个数为0。

4、unit:参数keepAliveTime参数的时间单位,7种静态属性取值:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
5、workQueue:一个阻塞队列,用来存储等待执行的任务,一般阻塞队列有以下几种选择:

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

线程池的排队策略和阻塞队列的有关。

6、threadFactory:线程工厂,用来创建线程的工厂类

7、handler:表示拒绝处理任务时的策略,一般有四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务


ThreadPoolExecutor类中非常重要的方法:

1、execute():向线程池提交任务,由线程池去执行

2、submit():也是向线程池提交任务,但是能返回任务执行结果,实际上还是调用execute()方法,利用了Future Pattern。

3、shutdown():将线程池状态置为SHUTDOWN状态,之后不能往线程池添加任何任务,否则都会抛出RejectedExecutionException异常。但是线程池不会立刻退出,而是直到线程池中所有任务都处理完,才退出。

4、shutdownNow():将线程池状态立刻变成STOP状态,试图停止所有正在执行的线程,不在处理还在队列中等待的任务,然后返回未执行的任务。

对于ThreadPoolExecutor类先了解到这,这个类的第四个构造函数的构造参数多达7个,但是幸运的是jdk提供了4个构造不同种类线程池的静态方法供我们选择。

1、CachedThreadPool

首先看一下这个线程池对应的构造方法:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
也就是当我们执行newCachedThreadPool()静态方法的时候实际上是执行了ThreadPoolExecutor()构造函数,而参数含义代表:

核心池大小为0

线程池最大线程数目为最大整型

当线程池中的线程60s没有执行任务就终止

阻塞队列为SynchronousQueue,SynchronousQueue是个有什么特点的阻塞队列?

a、每个put操作必须等待一个take操作,反之亦然(相当于管道)

b、不允许null元素

c、是线程安全的,阻塞的

d、iterator()永远为空,peek()永远返回null,isEmpty()永远是true,remove()/removeAll()永远是false。

e、内部队列没有任何内部容量

SynchronousQueue

我们对SynchronousQueue做一个小测试:

SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
sq.put(1);
sq.take();
这种情况下程序会一直阻塞下去,因为SynchronousQueue的put操作必须等待其take操作。

我们需要以两个不同线程的形式来通过SynchronousQueue传输数据:

建立一个TakeThread类:

package newCachedThreadPool;

import java.util.concurrent.SynchronousQueue;

public class TakeThread extends Thread {
private SynchronousQueue<Integer> sq;
@SuppressWarnings("unused")
private String name;
public TakeThread(SynchronousQueue<Integer> sq,String name){
super(name);
this.sq = sq;
}
@Override
public void run(){
while(true){
try {
Integer i = sq.take();
System.out.println(Thread.currentThread().getName()+"取出"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
一个PutThread类:

package newCachedThreadPool;

import java.util.concurrent.SynchronousQueue;

public class PutThread extends Thread {
private SynchronousQueue<Integer> sq;
@SuppressWarnings("unused")
private String name;
public PutThread(SynchronousQueue<Integer> sq,String name){
super(name);
this.sq = sq;
}
@Override
public void run(){
Integer i = 0;
while(true){
try {
sq.put(i);
System.out.println(Thread.currentThread().getName()+"放入"+i++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
最后在测试类中进行测试:

package newCachedThreadPool;

public class Test{
public static void main(String[] args){
SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
new PutThread(sq,"puter").start();
new TakeThread(sq,"taker").start();
}
}
运行结果如下:



所以说SynchronousQueue的take操作需要put操作等待,put操作需要take操作等待,否则会阻塞,线程进入wait set。

这就意味着:

1、这是一个可以无限扩大的线程池;

2、适合处理执行时间比较小的任务;

3、线程空闲时间超过60s就会被杀死,所以长时间处于空闲状态的时候,这种线程池几乎不占用资源;

4、阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程。

下面测试一下这种线程池:

public class Test {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;// java中可笑的闭包 得传final类型
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);//让当前线程休息一会儿
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行"+index);//打印当前线程
}
});
}
cachedThreadPool.shutdown();
}
}


在这个测试当中,让CachedThreadPool中的线程执行完就休息100ms,这样使得另一个请求来的时候便创建一个新的线程,于是执行结果是这样的:



每当一个新的请求来的时候,由于之前的线程不是空闲状态,而且这种线程池的阻塞队列不能存储,所以需要开辟新的线程来处理这个请求,因此就出现了这样的情况,对于10个请求线程池开辟了10个线程来进行处理。

如果我们将代码改一改:

public class Test {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;// java中可笑的闭包 得传final类型
try {
Thread.sleep(100);//在请求处理前给之前处理请求的线程足够的释放时间 使得之前线程能够继续处理接下来的请求
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行"+index);
}
});
}
}
}
这种情况运行结果如下:



由于每次新的请求加入时,线程池中1号线程都是空闲状态,所以线程池就不需要开辟其他的线程来处理新的请求,所以就一直是1号线程在处理请求。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐