Java自定义线程池和线程总数控制
2017-03-10 14:35
525 查看
1 概述
池化是常见的思想,线程池是非常典型的池化的实现,《Java并发编程实战》也大篇幅去讲解了Java中的线程池。本文实现一个简单的线程池。
2 核心类
【1】接口定义
[java] view
plain copy
print?
public interface IThreadPool<Job extends Runnable> {
/**
* 关闭线程池
*/
public void shutAlldown();
/**
* 执行任务
*
* @param job 任务
*/
public void execute(Job job);
/**
* 添加工作者
*
* @param addNum 添加数
*/
public void addWorkers(int addNum);
/**
* 减少工作者
*
* @param reduceNum 减少数目
*/
public void reduceWorkers(int reduceNum);
}
【2】实现类
线程池的核心是维护了1个任务列表和1个工作者列表。
[java] view
plain copy
print?
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {
// 默认线程数
private static int DEAFAULT_SIZE = 5;
// 最大线程数
private static int MAX_SIZE = 10;
// 任务列表
private LinkedList<Job> tasks = new LinkedList<Job>();
// 工作线程列表
private List<Worker> workers = Collections
.synchronizedList(new ArrayList<Worker>());
/**
* 默认构造函数
*/
public XYThreadPool() {
initWokers(DEAFAULT_SIZE);
}
/**
* 执行线程数
*
* @param threadNums 线程数
*/
public XYThreadPool(int workerNum) {
workerNum = workerNum <= 0 ? DEAFAULT_SIZE
: workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
initWokers(workerNum);
}
/**
* 初始化线程池
*
* @param threadNums 线程数
*/
public void initWokers(int threadNums) {
for (int i = 0; i < threadNums; i++) {
Worker worker = new Worker();
worker.start();
workers.add(worker);
}
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutAlldown();
}
});
}
@Override
public void shutAlldown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void execute(Job job) {
synchronized (tasks) {
// 提交任务就是将任务对象加入任务队列,等待工作线程去处理
tasks.addLast(job);
tasks.notifyAll();
}
}
@Override
public void addWorkers(int addNum) {
// 新线程数必须大于零,并且线程总数不能大于最大线程数
if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
initWokers(addNum);
} else {
System.out.println("addNum too large");
}
}
@Override
public void reduceWorkers(int reduceNum) {
if ((workers.size() - reduceNum <= 0))
System.out.println("thread num too small");
else {
// 暂停指定数量的工作者
int count = 0;
while (count != reduceNum) {
for (Worker w : workers) {
w.shutdown();
count++;
}
}
}
}
/**
* 工作线程
*/
class Worker extends Thread {
private volatile boolean flag = true;
@Override
public void run() {
while (flag) {
Job job = null;
// 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全)
synchronized (tasks) {
// 任务队列为空
while (tasks.isEmpty()) {
try {
// 阻塞,放弃对象锁,等待被notify唤醒
tasks.wait();
System.out.println("block when tasks is empty");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 不为空取出任务
job = tasks.removeFirst();
System.out.println("get job:" + job + ",do biz");
job.run();
}
}
}
public void shutdown() {
flag = false;
}
}
}
1 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备
2 Object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyAll(): 唤醒所有正在等待该对象的线程。notifyAll使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁。
3 无需控制线程总数
每调用一次就会创建一个拥有10个线程工作者的线程池。
[java] view
plain copy
print?
public class TestService1 {
public static void main(String[] args) {
// 启动10个线程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====1 test====");
}
});
}
}
public class TestService2 {
public static void main(String[] args) {
// 启动10个线程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====2 test====");
}
});
}
}
4 控制线程总数
希望在项目中所有的线程调用,都共用1个固定工作者数大小的线程池。
[java] view
plain copy
print?
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool;
/**
* 统一线程池管理类
*/
@Component
public class XYThreadManager {
private XYThreadPool<Runnable> executorPool;
@PostConstruct
public void init() {
executorPool = new XYThreadPool<Runnable>(10);
}
public XYThreadPool<Runnable> getExecutorPool() {
return executorPool;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService3")
public class TestService3 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====3 test====");
}
});
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService4")
public class TestService4 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====4 test====");
}
});
}
}
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
@SuppressWarnings("resource")
public static void main(String[] args) {
ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");
TestService3 t3 = (TestService3) atc.getBean("testService3");
t3.test();
TestService4 t4 = (TestService4) atc.getBean("testService4");
t4.test();
}
}
池化是常见的思想,线程池是非常典型的池化的实现,《Java并发编程实战》也大篇幅去讲解了Java中的线程池。本文实现一个简单的线程池。
2 核心类
【1】接口定义
[java] view
plain copy
print?
public interface IThreadPool<Job extends Runnable> {
/**
* 关闭线程池
*/
public void shutAlldown();
/**
* 执行任务
*
* @param job 任务
*/
public void execute(Job job);
/**
* 添加工作者
*
* @param addNum 添加数
*/
public void addWorkers(int addNum);
/**
* 减少工作者
*
* @param reduceNum 减少数目
*/
public void reduceWorkers(int reduceNum);
}
【2】实现类
线程池的核心是维护了1个任务列表和1个工作者列表。
[java] view
plain copy
print?
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {
// 默认线程数
private static int DEAFAULT_SIZE = 5;
// 最大线程数
private static int MAX_SIZE = 10;
// 任务列表
private LinkedList<Job> tasks = new LinkedList<Job>();
// 工作线程列表
private List<Worker> workers = Collections
.synchronizedList(new ArrayList<Worker>());
/**
* 默认构造函数
*/
public XYThreadPool() {
initWokers(DEAFAULT_SIZE);
}
/**
* 执行线程数
*
* @param threadNums 线程数
*/
public XYThreadPool(int workerNum) {
workerNum = workerNum <= 0 ? DEAFAULT_SIZE
: workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
initWokers(workerNum);
}
/**
* 初始化线程池
*
* @param threadNums 线程数
*/
public void initWokers(int threadNums) {
for (int i = 0; i < threadNums; i++) {
Worker worker = new Worker();
worker.start();
workers.add(worker);
}
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutAlldown();
}
});
}
@Override
public void shutAlldown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void execute(Job job) {
synchronized (tasks) {
// 提交任务就是将任务对象加入任务队列,等待工作线程去处理
tasks.addLast(job);
tasks.notifyAll();
}
}
@Override
public void addWorkers(int addNum) {
// 新线程数必须大于零,并且线程总数不能大于最大线程数
if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
initWokers(addNum);
} else {
System.out.println("addNum too large");
}
}
@Override
public void reduceWorkers(int reduceNum) {
if ((workers.size() - reduceNum <= 0))
System.out.println("thread num too small");
else {
// 暂停指定数量的工作者
int count = 0;
while (count != reduceNum) {
for (Worker w : workers) {
w.shutdown();
count++;
}
}
}
}
/**
* 工作线程
*/
class Worker extends Thread {
private volatile boolean flag = true;
@Override
public void run() {
while (flag) {
Job job = null;
// 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全)
synchronized (tasks) {
// 任务队列为空
while (tasks.isEmpty()) {
try {
// 阻塞,放弃对象锁,等待被notify唤醒
tasks.wait();
System.out.println("block when tasks is empty");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 不为空取出任务
job = tasks.removeFirst();
System.out.println("get job:" + job + ",do biz");
job.run();
}
}
}
public void shutdown() {
flag = false;
}
}
}
1 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备
2 Object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyAll(): 唤醒所有正在等待该对象的线程。notifyAll使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁。
3 无需控制线程总数
每调用一次就会创建一个拥有10个线程工作者的线程池。
[java] view
plain copy
print?
public class TestService1 {
public static void main(String[] args) {
// 启动10个线程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====1 test====");
}
});
}
}
public class TestService2 {
public static void main(String[] args) {
// 启动10个线程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====2 test====");
}
});
}
}
4 控制线程总数
希望在项目中所有的线程调用,都共用1个固定工作者数大小的线程池。
[java] view
plain copy
print?
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool;
/**
* 统一线程池管理类
*/
@Component
public class XYThreadManager {
private XYThreadPool<Runnable> executorPool;
@PostConstruct
public void init() {
executorPool = new XYThreadPool<Runnable>(10);
}
public XYThreadPool<Runnable> getExecutorPool() {
return executorPool;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService3")
public class TestService3 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====3 test====");
}
});
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService4")
public class TestService4 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====4 test====");
}
});
}
}
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
@SuppressWarnings("resource")
public static void main(String[] args) {
ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");
TestService3 t3 = (TestService3) atc.getBean("testService3");
t3.test();
TestService4 t4 = (TestService4) atc.getBean("testService4");
t4.test();
}
}
相关文章推荐
- Java自定义线程池和线程总数控制
- 【Java多线程】写入同一文件,自定义线程池与线程回收利用2
- 【Java多线程】写入同一文件,自定义线程池与线程回收利用
- java concurrent 探秘(经典的线程个数,是否完成等的控制) 线程池
- Java的线程控制方法和线程池
- java concurrent 探秘(经典的线程个数,是否完成等的控制) 线程池
- java线程安全之Executor框架及自定义线程池(十五)
- 【Java多线程】写入同一文件,自定义线程池与线程回收利用
- python自定义线程池控制线程数量
- java.util.concurrent解读,自定义线程工厂,线程池
- Java线程:新特征-线程池
- 线程池,封装使用,实现控制子线程
- (7)java5线程并发库的应用(线程池) 以及在实际项目中的使用。。
- Java线程(五):线程池
- Java线程:新特征-线程池
- (7)java5线程并发库的应用(线程池)
- java基础--线程--线程的基本控制和交互--02
- ThinkInJava杂记--并发控制-02-线程控制
- 【java多线程与并发库】---传统java多线程<5> 线程控制
- Java线程:新特征-线程池