您的位置:首页 > 编程语言 > Java开发

Java多线程-----第一节:线程池的使用

2017-09-26 20:38 218 查看


1 引入线程池的原因

  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

由于线程的生命周期中包括创建、就绪、运行、阻塞、销毁阶段,当我们待处理的任务数目较小时,我们可以自己创建几个线程来处理相应的任务,但当有大量的任务时,由于创建、销毁线程需要很大的开销,运用线程池这些问题就大大的缓解了。

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

一个线程池包括以下四个基本组成部分:
                1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。


2 线程池的使用

在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

  我们只需要运用Executors类给我们提供的静态方法,就可以创建相应的线程池:

  public static ExecutorSevice newSingleThreadExecutor() 

//单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务,这个线程处理完一个任务后接着处理下一个任务,

若该线程出现异常,将会有一个新的线程来替代。

  public static ExecutorSevice newFixedThreadPool(int n) 

//固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行

  public static ExecutorSevice  newCachedThreadPool()(推荐使用)

//可缓存线程池,根据用户的任务数创建相应的线程来处理,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,

     又智能的添加新线程来执行。

 
    public static ExecutorSevice  newScheduleThreadPool()

 
   //大小无限制的线程池,支持定时和周期性的执行线程。

  我们只需要将待执行的任务放入run方法中即可,将Runnable接口的实现类交给线程池的execute方法,作为它的一个参数,如下所示:

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable(){
public void run(){
//执行的任务
}
}


  如果需要给任务传递参数,可以通过创建一个Runnable接口的实现类来完成。

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

   newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

   newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

   newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

   实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

   另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。


3 Executors提供的线程池配置方案 

1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理 

Java代码  


public static ExecutorService newFixedThreadPool(int nThreads) {  

        return new ThreadPoolExecutor(nThreads, nThreads,  

                                      0L, TimeUnit.MILLISECONDS,  

                                      new LinkedBlockingQueue<Runnable>());  

    }  

2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁 

Java代码  


public static ExecutorService newCachedThreadPool() {  

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

                                      60L, TimeUnit.SECONDS,  

                                      new SynchronousQueue<Runnable>());  

    }  

3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行 

Java代码  


public static ExecutorService newSingleThreadExecutor() {  

        return new FinalizableDelegatedExecutorService  

            (new ThreadPoolExecutor(1, 1,  

                                    0L, TimeUnit.MILLISECONDS,  

                                    new LinkedBlockingQueue<Runnable>()));  

    }  

4、构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的 

Java代码  


public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  

        return new ScheduledThreadPoolExecutor(corePoolSize);  

    }  

  

public static ScheduledExecutorService newScheduledThreadPool(  

            int corePoolSize, ThreadFactory threadFactory) {  

        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  

    }  

  

public ScheduledThreadPoolExecutor(int corePoolSize,  

                             ThreadFactory threadFactory) {  

        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,  

              new DelayedWorkQueue(), threadFactory);  

    }  


4 线程池使用的示例

  下面我们通过一个实例来说明线程池的使用方法,该实例模仿子HADOOP中作业初始化过程,也即利用线程池从队列中取出作业并对作业进行初始化,其代码如下:



package com.yueliming.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {

public static List<Double> queue;
public ExecutorService threadPool;

public FixedThreadPool() {
queue = new ArrayList<Double>();
//产生一个 ExecutorService 对象,这个对象带有一个大小为 poolSize 的线程池,若任务数量大于 poolSize ,任务会被放在一个 queue 里顺序执行。
threadPool = Executors.newFixedThreadPool(5);
}

public static void main(String[] args) {
FixedThreadPool outer = new FixedThreadPool();
FixedThreadPool.Manager inner = outer.new Manager();
Thread consumer = new Thread(inner);

Thread producer = new Thread() {//用于向queue中放入数据
public void run() {
while (true) {
synchronized (queue) {
double time = 1d;
long startTime = System.currentTimeMillis();
if (System.currentTimeMillis() - startTime >= time) {
startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
queue.add((Math.random() * 10000));
}
queue.notify();
}
}
}
}
};
consumer.start();//启动守护线程,采用线程池来从queue中读取数据
producer.start();
}

class Manager implements Runnable {
int num = 0;
public void run() {
while (true) {
try {
synchronized (queue) {
System.out.println("队列的长度为:" + queue.size());
while (queue.isEmpty()) {
queue.wait();
}
double result = queue.remove(0);
num++;
System.out.println("成功从队列中取到数据!" + num);
threadPool.execute(new ExecutorThread(result));
}
} catch (InterruptedException t) {
break;
}
}
threadPool.shutdown();
}
}

class ExecutorThread implements Runnable {

private double value;

public ExecutorThread(double value) {
this.value = value;
}

public void run() {
System.out.println("This is " + Thread.currentThread().getName() + " " + value);
}
}
}


  其中内部类Manager为一个线程负责从队列中获取作业,并交给线程池去处理任务,有一个线程专门将数据放入到队列中,也即每隔1ms向队列中放入10个数据。


5 自己手写一个线程池

一个线程池包括以下四个基本组成部分:
                1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。


简单的线程池接口:

public interface ThreadPool<Job extends Runnable>{
//执行一个任务(Job),这个Job必须实现Runnable
void execute(Job job);
//关闭线程池
void shutdown();
//增加工作者线程,即用来执行任务的线程
void addWorkers(int num);
//减少工作者线程
void removeWorker(int num);
//获取正在等待执行的任务数量
void getJobSize();
}



线程池接口的默认实现

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{

// 线程池维护工作者线程的最大数量
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池维护工作者线程的默认值
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池维护工作者线程的最小数量
private static final int MIN_WORKER_NUMBERS = 1;
// 维护一个工作列表,里面加入客户端发起的工作
private final LinkedList<Job> jobs = new LinkedList<Job>();
// 工作者线程的列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
// 工作者线程的数量
private int workerNum;
// 每个工作者线程编号生成
private AtomicLong threadNum = new AtomicLong();

//生成线程池
public DefaultThreadPool() {
this.workerNum = DEFAULT_WORKER_NUMBERS;
initializeWorkers(this.workerNum);
}

public DefaultThreadPool(int num) {
if (num > MAX_WORKER_NUMBERS) {
this.workerNum =DEFAULT_WORKER_NUMBERS;
} else {
this.workerNum = num;
}
initializeWorkers(this.workerNum);
}
//初始化每个工作者线程
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
//添加到工作者线程的列表
workers.add(worker);
//启动工作者线程
Thread thread = new Thread(worker);
thread.start();
}
}

public void execute(Job job) {
if (job != null) {
//根据线程的"等待/通知机制"这里必须对jobs加锁
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
//关闭线程池即关闭每个工作者线程
public void shutdown() {
for (Worker w : workers) {
w.shutdown();
}
}
//增加工作者线程
public void addWorkers(int num) {
//加锁,防止该线程还么增加完成而下个线程继续增加导致工作者线程超过最大值
synchronized (jobs) {
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}
//减少工作者线程
public void removeWorker(int num) {
synchronized (jobs) {
if(num>=this.workerNum){
throw new IllegalArgumentException("超过了已有的线程数量");
}
for (int i = 0; i < num; i++) {
Worker worker = workers.get(i);
if (worker != null) {
//关闭该线程并从列表中移除
worker.shutdown();
workers.remove(i);
}
}
this.workerNum -= num;
}
}

public int getJobSize() {
// TODO Auto-generated method stub
return workers.size();
}
//定义工作者线程
class Worker implements Runnable {
// 表示是否运行该worker
private volatile boolean running = true;

public void run() {
while (running) {
Job job = null;
//线程的等待/通知机制
synchronized (jobs) {
if (jobs.isEmpty()) {
try {
jobs.wait();//线程等待唤醒
} catch (InterruptedException e) {
//感知到外部对该线程的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个job
job = jobs.removeFirst();
}
//执行job
if (job != null) {
job.run();
}
}
}

// 终止该线程
public void shutdown() {
running = false;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

从线程池的实现中可以看出,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不读的从jobs上获取Job来执行,当jobs为空时,工作者线程进入WAITING状态。

当添加一个Job后,对工作队列jobs调用其notify()方法来唤醒一个工作者线程。此处我们不调用notifyAll(),避免将等待队列中的线程全部移动到阻塞队列中而造成资源浪费。

线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程。客户端线程把任务放入工作队列后便返回,而工作者线程则不端的从工作队列中取出工作并执行。当工作队列为空时,工作者线程进入WAITING状态,当有客户端发送任务过来后会通过任意一个工作者线程,随着大量任务的提交,更多的工作者线程被唤醒。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐