线程池ThreadPoolExecutor 和 ForkJoinPool 的分析使用
2017-04-18 23:27
477 查看
package com.ai.runner.test.thread; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThreadPoolExecutorForkJoinPoolTest { public static void main(String args[]){ //threadPoolExecutorTest(); forkJoinPoolTest(); } /** * * 分而治之线程池ForkJoinPool * * ForkJoinTask -> RecursiveTask :代表有返回值的任务 * ForkJoinTask -> RecursiveAction:代表没有返回值的任务 * * @author think * @ApiDocMethod * @ApiCode * @RestRelativeURL */ public static void forkJoinPoolTest(){ ForkJoinPool forkJoinPool = new ForkJoinPool(); int num = 3; List<Integer> list = new ArrayList<Integer>(); for(int i=0;i<=100;i++){ list.add(Integer.valueOf(i)); } TestRecursiveAction testRecursiveAction = new TestRecursiveAction(new ArrayList<Integer>(list),num); TestRecursiveTask testRecursiveTask = new TestRecursiveTask(new ArrayList<Integer>(list),num); forkJoinPool.submit(testRecursiveTask); forkJoinPool.submit(testRecursiveAction); try { System.out.println("TestRecursiveTask计算结果="+testRecursiveTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } forkJoinPool.shutdown(); while(true){ if(forkJoinPool.isTerminated()){ break; } else{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("TestRecursiveAction计算结果="+TestRecursiveAction.total); System.out.println("运行完成"); } static class TestRecursiveAction extends RecursiveAction{ public static AtomicInteger total = new AtomicInteger(); private List<Integer> list; private int num; public TestRecursiveAction(List<Integer> list,int num){ setNum(num); setList(list); } /** * */ private static final long serialVersionUID = -8338359510561815732L; @Override protected void compute() { try{ if(list.size() <= num){ System.out.println(Thread.currentThread().getName()+"开始计算"); for(Integer inter : list){ total.addAndGet(inter.intValue()); } } else{ List<Integer> hlist = new ArrayList<Integer>(list.subList(0, list.size()/2)); List<Integer> llist = new ArrayList<Integer>(list.subList(list.size()/2,list.size())); TestRecursiveAction htask = new TestRecursiveAction(hlist,num); TestRecursiveAction ltask = new TestRecursiveAction(llist,num); htask.fork(); ltask.fork(); } } catch(Exception e){ e.printStackTrace(); } } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public List<Integer> getList() { return list; } public void setList(List<Integer> list) { this.list = list; } } static class TestRecursiveTask extends RecursiveTask<Integer>{ private int num; private List<Integer> list; public TestRecursiveTask(List<Integer> list,int num){ setList(list); setNum(num); } /** * */ private static final long serialVersionUID = 1L; @Override protected Integer compute() { try{ if(list.size() <= num){ int total = 0; System.out.println(Thread.currentThread().getName()+"开始计算"); for(Integer inter : list){ total+=inter.intValue(); } return total; } else{ List<Integer> hlist = new ArrayList<Integer>(list.subList(0, list.size()/2)); List<Integer> llist = new ArrayList<Integer>(list.subList(list.size()/2,list.size())); TestRecursiveTask htask = new TestRecursiveTask(hlist,num); TestRecursiveTask ltask = new TestRecursiveTask(llist,num); htask.fork(); ltask.fork(); //合并结果 return htask.get().intValue()+ltask.get().intValue(); } } catch(Exception e){ e.printStackTrace(); } return null; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public List<Integer> getList() { return list; } public void setList(List<Integer> list) { this.list = list; } } /** * 线程池执行获取结果返回的两种种方式:FutureTask;Future * @author think * @ApiDocMethod * @ApiCode * @RestRelativeURL */ public static void threadPoolExecutorTest(){ BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1); //当queue满后并且最大线程数都忙碌处理任务时;则无法submit新task 此时则抛出RejectedExecutionException ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, queue); //继承RejectedExecutionHandler 去处理无法提交的任务 //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, queue, new RejectHandle()); //Executors 创建线程池工具类 //ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> tasks = new ArrayList<Future<String>>(); FutureTask<String> ctask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { try{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行"); Thread.sleep(5000); } catch(Exception e){ e.printStackTrace(); } finally{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束"); } return Thread.currentThread().getName(); } }); pool.submit(ctask); tasks.add(ctask); FutureTask<String> rtask = new FutureTask<String>(new Runnable() { @Override public void run() { try{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行"); Thread.sleep(5000); } catch(Exception e){ e.printStackTrace(); } finally{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束"); } } }, "complete"); pool.submit(rtask); tasks.add(rtask); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { try{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行"); Thread.sleep(5000); } catch(Exception e){ e.printStackTrace(); } finally{ System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束"); } return Thread.currentThread().getName(); } }); tasks.add(future); final CountDownLatch latch = new CountDownLatch(tasks.size()); for(int i=0;i<tasks.size();i++){ final Future<String> ft = tasks.get(i); new Thread(){ @Override public void run() { try { System.out.println(ft.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally{ latch.countDown(); } } }.start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); System.out.println("运行结束"); } static class RejectHandle implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //todo } } }
相关文章推荐
- Java多线程之ForkJoinTask,ForkJoinPool介绍及使用
- [Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors
- 0041 Java学习笔记-多线程-线程池、ForkJoinPool、ThreadLocal
- 【转】线程及同步的性能 - 线程池 / ThreadPoolExecutors / ForkJoinPool
- ForkJoinPool 源码分析
- QT线程池实验研究与分析(QThread与和QThreadPool + QRunnable使用上的区别)
- ForkJoinPool的使用
- (四)juc线程高级特性——线程池 / 线程调度 / ForkJoinPool
- ForkJoinPool 使用的错误写法
- 【转】线程及同步的性能 - 线程池 / ThreadPoolExecutors / ForkJoinPool
- Java多线程 -- JUC包源码分析19 -- ForkJoinPool/ForkJoinTask
- Java7之线程池ForkJoinPool
- 使用阻塞队列批量导入与使用forkjoinPool框架的导入对比
- GlusterFS之内存池(mem-pool)使用实例分析
- 聊聊并发(三)Java线程池的分析和使用
- 使用common-pool实现的一个简单的线程池
- JAVA线程池的分析和使用
- Android SoundPool 的使用以及原理分析
- Java多线程二(Java线程池的分析和使用)
- 聊聊并发(三)――JAVA线程池的分析和使用