您的位置:首页 > 其它

线程池ThreadPoolExecutor 和 ForkJoinPool 的分析使用

2017-04-18 23:27 477 查看
package com.ai.runner.test.thread;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExecutorForkJoinPoolTest {

public static void main(String args[]){

//threadPoolExecutorTest();
forkJoinPoolTest();
}

/**
*
* 分而治之线程池ForkJoinPool
*
* ForkJoinTask -> RecursiveTask :代表有返回值的任务
* ForkJoinTask -> RecursiveAction:代表没有返回值的任务
*
* @author think
* @ApiDocMethod
* @ApiCode
* @RestRelativeURL
*/
public static void forkJoinPoolTest(){

ForkJoinPool forkJoinPool = new ForkJoinPool();
int num = 3;
List<Integer> list = new ArrayList<Integer>();
for(int i=0;i<=100;i++){
list.add(Integer.valueOf(i));
}
TestRecursiveAction testRecursiveAction = new TestRecursiveAction(new ArrayList<Integer>(list),num);
TestRecursiveTask testRecursiveTask = new TestRecursiveTask(new ArrayList<Integer>(list),num);

forkJoinPool.submit(testRecursiveTask);
forkJoinPool.submit(testRecursiveAction);

try {
System.out.println("TestRecursiveTask计算结果="+testRecursiveTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
forkJoinPool.shutdown();
while(true){
if(forkJoinPool.isTerminated()){
break;
}
else{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("TestRecursiveAction计算结果="+TestRecursiveAction.total);
System.out.println("运行完成");
}

static class TestRecursiveAction extends RecursiveAction{

public static AtomicInteger total = new AtomicInteger();

private List<Integer> list;

private int num;

public TestRecursiveAction(List<Integer> list,int num){
setNum(num);
setList(list);
}

/**
*
*/
private static final long serialVersionUID = -8338359510561815732L;

@Override
protected void compute() {
try{

if(list.size() <= num){
System.out.println(Thread.currentThread().getName()+"开始计算");
for(Integer inter : list){
total.addAndGet(inter.intValue());
}
}
else{

List<Integer> hlist = new ArrayList<Integer>(list.subList(0, list.size()/2));
List<Integer> llist = new ArrayList<Integer>(list.subList(list.size()/2,list.size()));
TestRecursiveAction htask = new TestRecursiveAction(hlist,num);
TestRecursiveAction ltask = new TestRecursiveAction(llist,num);
htask.fork();
ltask.fork();
}

}
catch(Exception e){
e.printStackTrace();
}
}

public int getNum() {
return num;
}

public void setNum(int num) {
this.num = num;
}

public List<Integer> getList() {
return list;
}

public void setList(List<Integer> list) {
this.list = list;
}

}

static class TestRecursiveTask extends RecursiveTask<Integer>{

private int num;
private List<Integer> list;

public TestRecursiveTask(List<Integer> list,int num){
setList(list);
setNum(num);
}
/**
*
*/
private static final long serialVersionUID = 1L;

@Override
protected Integer compute() {
try{

if(list.size() <= num){
int total = 0;
System.out.println(Thread.currentThread().getName()+"开始计算");
for(Integer inter : list){
total+=inter.intValue();
}
return total;
}
else{

List<Integer> hlist = new ArrayList<Integer>(list.subList(0, list.size()/2));
List<Integer> llist = new ArrayList<Integer>(list.subList(list.size()/2,list.size()));
TestRecursiveTask htask = new TestRecursiveTask(hlist,num);
TestRecursiveTask ltask = new TestRecursiveTask(llist,num);
htask.fork();
ltask.fork();
//合并结果
return htask.get().intValue()+ltask.get().intValue();

}
}
catch(Exception e){
e.printStackTrace();
}
return null;
}

public int getNum() {
return num;
}

public void setNum(int num) {
this.num = num;
}

public List<Integer> getList() {
return list;
}

public void setList(List<Integer> list) {
this.list = list;
}

}

/**
* 线程池执行获取结果返回的两种种方式:FutureTask;Future
* @author think
* @ApiDocMethod
* @ApiCode
* @RestRelativeURL
*/
public static void threadPoolExecutorTest(){

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1);

//当queue满后并且最大线程数都忙碌处理任务时;则无法submit新task 此时则抛出RejectedExecutionException
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, queue);

//继承RejectedExecutionHandler 去处理无法提交的任务
//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, queue, new RejectHandle());

//Executors 创建线程池工具类
//ExecutorService pool = Executors.newFixedThreadPool(2);

List<Future<String>> tasks = new ArrayList<Future<String>>();

FutureTask<String> ctask = new  FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {

try{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行");
Thread.sleep(5000);
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束");
}
return Thread.currentThread().getName();
}

});

pool.submit(ctask);
tasks.add(ctask);

FutureTask<String> rtask = new  FutureTask<String>(new Runnable() {
@Override
public void run() {
try{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行");
Thread.sleep(5000);
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束");
}

}

}, "complete");

pool.submit(rtask);
tasks.add(rtask);

Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"开始运行");
Thread.sleep(5000);
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println(new Timestamp(System.currentTimeMillis())+":"+Thread.currentThread().getName()+"运行结束");
}
return Thread.currentThread().getName();
}
});
tasks.add(future);

final CountDownLatch latch = new CountDownLatch(tasks.size());

for(int i=0;i<tasks.size();i++){
final Future<String> ft = tasks.get(i);
new Thread(){

@Override
public void run() {
try {
System.out.println(ft.get());
} catch (InterruptedException e) {

e.printStackTrace();
} catch (ExecutionException e) {

e.printStackTrace();
}
finally{
latch.countDown();
}
}

}.start();
}
try {
latch.await();
} catch (InterruptedException e) {

e.printStackTrace();
}

pool.shutdown();

System.out.println("运行结束");
}

static class RejectHandle implements RejectedExecutionHandler{

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

//todo
}

}
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: