您的位置:首页 > 编程语言 > Java开发

Java自定义线程池和线程总数控制

2017-03-10 14:35 525 查看
1 概述

池化是常见的思想,线程池是非常典型的池化的实现,《Java并发编程实战》也大篇幅去讲解了Java中的线程池。本文实现一个简单的线程池。

2 核心类

【1】接口定义

[java] view
plain copy

 print?





public interface IThreadPool<Job extends Runnable> {  

    /** 

     * 关闭线程池 

     */  

    public void shutAlldown();  

  

    /** 

     * 执行任务 

     *  

     * @param job 任务 

     */  

    public void execute(Job job);  

  

    /** 

     * 添加工作者 

     *  

     * @param addNum 添加数 

     */  

    public void addWorkers(int addNum);  

  

    /** 

     * 减少工作者 

     *  

     * @param reduceNum 减少数目 

     */  

    public void reduceWorkers(int reduceNum);  

}  

【2】实现类

线程池的核心是维护了1个任务列表和1个工作者列表。

[java] view
plain copy

 print?





import java.util.ArrayList;  

import java.util.Collections;  

import java.util.LinkedList;  

import java.util.List;  

  

public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {  

  

    // 默认线程数  

    private static int DEAFAULT_SIZE = 5;  

    // 最大线程数  

    private static int MAX_SIZE = 10;  

  

    // 任务列表  

    private LinkedList<Job> tasks = new LinkedList<Job>();  

    // 工作线程列表  

    private List<Worker> workers = Collections  

            .synchronizedList(new ArrayList<Worker>());  

  

    /** 

     * 默认构造函数 

     */  

    public XYThreadPool() {  

        initWokers(DEAFAULT_SIZE);  

    }  

  

    /** 

     * 执行线程数 

     *  

     * @param threadNums 线程数 

     */  

    public XYThreadPool(int workerNum) {  

        workerNum = workerNum <= 0 ? DEAFAULT_SIZE  

                : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;  

        initWokers(workerNum);  

    }  

  

    /** 

     * 初始化线程池 

     *  

     * @param threadNums 线程数 

     */  

    public void initWokers(int threadNums) {  

        for (int i = 0; i < threadNums; i++) {  

            Worker worker = new Worker();  

            worker.start();  

            workers.add(worker);  

        }  

        // 添加关闭钩子  

        Runtime.getRuntime().addShutdownHook(new Thread() {  

            public void run() {  

                shutAlldown();  

            }  

        });  

    }  

  

    @Override  

    public void shutAlldown() {  

        for (Worker worker : workers) {  

            worker.shutdown();  

        }  

    }  

  

    @Override  

    public void execute(Job job) {  

        synchronized (tasks) {  

            // 提交任务就是将任务对象加入任务队列,等待工作线程去处理  

            tasks.addLast(job);  

            tasks.notifyAll();  

        }  

    }  

  

    @Override  

    public void addWorkers(int addNum) {  

        // 新线程数必须大于零,并且线程总数不能大于最大线程数  

        if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {  

            initWokers(addNum);  

        } else {  

            System.out.println("addNum too large");  

        }  

    }  

  

    @Override  

    public void reduceWorkers(int reduceNum) {  

        if ((workers.size() - reduceNum <= 0))  

            System.out.println("thread num too small");  

        else {  

            // 暂停指定数量的工作者  

            int count = 0;  

            while (count != reduceNum) {  

                for (Worker w : workers) {  

                    w.shutdown();  

                    count++;  

                }  

            }  

        }  

    }  

  

    /** 

     * 工作线程 

     */  

    class Worker extends Thread {  

  

        private volatile boolean flag = true;  

  

        @Override  

        public void run() {  

            while (flag) {  

                Job job = null;  

                // 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全)  

                synchronized (tasks) {  

                    // 任务队列为空  

                    while (tasks.isEmpty()) {  

                        try {  

                            // 阻塞,放弃对象锁,等待被notify唤醒  

                            tasks.wait();  

                            System.out.println("block when tasks is empty");  

                        } catch (InterruptedException e) {  

                            e.printStackTrace();  

                        }  

                    }  

                    // 不为空取出任务  

                    job = tasks.removeFirst();  

                    System.out.println("get job:" + job + ",do biz");  

                    job.run();  

                }  

            }  

        }  

  

        public void shutdown() {  

            flag = false;  

        }  

    }  

}  

1 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备

2 Object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyAll(): 唤醒所有正在等待该对象的线程。notifyAll使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁。


3 无需控制线程总数
每调用一次就会创建一个拥有10个线程工作者的线程池。

[java] view
plain copy

 print?





public class TestService1 {  

    public static void main(String[] args) {  

        // 启动10个线程  

        XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);  

        pool.execute(new Runnable() {  

            @Override  

            public void run() {  

                System.out.println("====1 test====");  

            }  

        });   

    }  

}  

  

public class TestService2 {  

    public static void main(String[] args) {  

        // 启动10个线程  

        XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);  

        pool.execute(new Runnable() {  

            @Override  

            public void run() {  

                System.out.println("====2 test====");  

            }  

        });  

    }  

}  


4 控制线程总数
希望在项目中所有的线程调用,都共用1个固定工作者数大小的线程池。

[java] view
plain copy

 print?





import javax.annotation.PostConstruct;  

import org.springframework.stereotype.Component;  

import com.xy.pool.XYThreadPool;  

  

/** 

 * 统一线程池管理类  

 */  

@Component  

public class XYThreadManager {  

  

    private XYThreadPool<Runnable> executorPool;  

  

    @PostConstruct  

    public void init() {  

        executorPool = new XYThreadPool<Runnable>(10);  

    }  

  

    public XYThreadPool<Runnable> getExecutorPool() {  

        return executorPool;  

    }  

}  

  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.stereotype.Service;  

  

@Service("testService3")  

public class TestService3 {  

      

    @Autowired  

    private XYThreadManager threadManager;  

      

    public void test() {  

        threadManager.getExecutorPool().execute(new Runnable() {  

            @Override  

            public void run() {  

                System.out.println("====3 test====");  

            }  

        });  

    }  

}  

  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.stereotype.Service;  

  

@Service("testService4")  

public class TestService4 {  

      

    @Autowired  

    private XYThreadManager threadManager;  

      

    public void test() {  

        threadManager.getExecutorPool().execute(new Runnable() {  

            @Override  

            public void run() {  

                System.out.println("====4 test====");  

            }  

        });  

    }  

}  

  

import org.springframework.context.ApplicationContext;  

import org.springframework.context.support.ClassPathXmlApplicationContext;  

  

public class TestMain {  

  

    @SuppressWarnings("resource")  

    public static void main(String[] args) {  

        ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");  

  

        TestService3 t3 = (TestService3) atc.getBean("testService3");  

        t3.test();  

  

        TestService4 t4 = (TestService4) atc.getBean("testService4");  

        t4.test();  

    }  

  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: