您的位置:首页 > 编程语言 > Java开发

[笔记][Java7并发编程实战手册]7. 定制并发类

2015-10-07 20:00 309 查看
[笔记][Java7并发编程实战手册]系列目录

本章内容包括:

定制ThreadPoolExecutor类

实现基于优先级的Executor类

实现ThreadFactory接口生成定制线程

在Executor对象中使用THreadFactory

定制运行在定时线程池中的任务

通过实现ThreadFactory接口为Fork/Join框架生成定制线程

定制运行在Fork/Join框架中的任务

实现定制Lock类

实现基于优先级的传输队列

实现自己的原子对象

7.1.简介

Java并发API提供了大量接口和类来实现并发应用程序,包含了底层机制和高层机制:

底层机制:Thread类、Runnable接口或Callable接口、synchronized关键字

高层机制:Executor框架和 Fork/Join (
Java7新增


然而这并没有什么卵用,某些时候还是不能满足我们的开发需求.

这时,我们就需要基于Java提供的类和api来实现自己的定制并发工具.

实现一个接口以拥有接口定义的功能,例如:ThreadFactory

覆盖类的一些方法,改变这些方法的行为,来满足需求,例如:覆盖Thread类的run方法,它默认什么都不做,可以用来覆盖以提供预期的功能.

在本章中,我们将学习如何改变一些Java并发API的行为,而不需要从头设计一个并发框架.

getCompletedTaskCount() 已执行任务数量

getActiveCount() 正在执行任务数量

getQueue().size() 未执行任务数量,在构造的时候 new LinkedBlockingDeque()) 的阻塞队列(一个基于已链接节点的、任选范围的阻塞双端队列。),用于保存待执行submit提交的任务.被执行/执行过的任务会从此列表移除

7.2.定制ThreadPoolExecutor类 ##

  Executor框架是一种将线程的创建和执行分离的机制.它基于Executor和ExecutorService接口,以及这两个接口的实现类ThreadPoolExecutor展开.Executor有一个内部线程池,并提供了将任务传递到池中线程以获得执行的方法.可传递的任务有如下两种

通过Runnable接口实现的任务,它不返回结果.

通过Callable接口实现的任务,它返回结果.

  在这两种情况下,只需要传递任务到执行器,执行器即可使用线程池中的线程活新创建的线程来执行任务.执行器也决定了任务执行的时间.

  本章将学习如何覆盖ThreadPollExecutor,通过示例来计算任务在执行器中执行的时间,执行结束后在控制台输出有关执行器的统计信息.

示例

场景描述:本示例继承ThreadPoolExecutor,并覆盖其中的几个方法.来达到定制自己需要的 计算任务执行的耗时 和 打印任务信息.

/**
* Created by zhuqiang on 2015/10/3 0003.
*/
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyExecutor myExecutor = new MyExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
List<Future<String>> results = new ArrayList<>();
//提交10个任务
for (int i = 0; i < 10; i++) {
Future<String> result = myExecutor.submit(new SleepTask());
results.add(result);
}

System.out.println("********  获取前5个任务结果 **********");
for (int i = 0; i < 5; i++) {
String s = results.get(i).get();
System.out.printf("前:%s/%s,结果:%s\n", i, 5, s);
}
myExecutor.shutdown();
System.out.println("********  获取后5个任务结果 **********");
for (int i = 5; i < 10; i++) {
String s = results.get(i).get();
System.out.printf("后:%s/%s,结果:%s\n", i,10, s);
}
myExecutor.awaitTermination(1,TimeUnit.DAYS);
System.out.println("main:任务执行完成");
}
}

class MyExecutor extends ThreadPoolExecutor{
private ConcurrentHashMap<String,Date> startTimes = new ConcurrentHashMap<String, Date>();

/**
*
* @param corePoolSize 池中所保存的线程数,包括空闲线程。
* @param maximumPoolSize 池中允许的最大线程数
* @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
* @param unit 参数的时间单位。
* @param workQueue 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
*/
public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

//按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
@Override
public void shutdown() {
System.out.printf("MyExecutor:shutdown关闭执行器开始\n");
System.out.printf("MyExecutor:shutdown已执行任务数量%d\n",this.getCompletedTaskCount());
System.out.printf("MyExecutor:shutdown正在执行的任务数量:%d\n",this.getActiveCount());
System.out.printf("MyExecutor:shutdown未执行的任务数量:%d\n",this.getQueue().size());

super.shutdown();
}
//尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
@Override
public List<Runnable> shutdownNow() {
System.out.printf("MyExecutor:shutdownNow关闭执行器开始\n");
System.out.printf("MyExecutor:shutdownNow已执行任务数量%d\n",this.getCompletedTaskCount());
System.out.printf("MyExecutor:shutdownNow正在执行的任务数量:%d\n",this.getActiveCount());
System.out.printf("MyExecutor:shutdownNow未执行的任务数量:%d\n",this.getQueue().size());
return super.shutdownNow();
}

//任务执行前
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.printf("MyExecutor:beforeExecute任务 %s 将开始执行,任务hashCode=%s\n",t.getName(),r.hashCode());
startTimes.put(String.valueOf(r.hashCode()), new Date());
super.beforeExecute(t, r);
}

//任务执行完成后
@Override
protected void afterExecute(Runnable r, Throwable t) {
Future<?> result = (Future<?>)r;
super.afterExecute(r, t);
try {
System.out.printf("afterExecute:任务结果:%s\n",result.get());
Date end = new Date();
Date start = startTimes.remove(String.valueOf(r.hashCode()));
System.out.printf("afterExecute:任务执行耗时:%s毫秒\n",end.getTime() - start.getTime());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

//任务模拟类
class SleepTask implements Callable<String>{

@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return new Date().toString();
}
}


某一次的运行结果

********  获取前5个任务结果 **********
MyExecutor:beforeExecute任务 pool-1-thread-1 将开始执行,任务hashCode=533622610
MyExecutor:beforeExecute任务 pool-1-thread-2 将开始执行,任务hashCode=1123936020
afterExecute:任务结果:Sat Oct 03 12:34:01 CST 2015
afterExecute:任务结果:Sat Oct 03 12:34:01 CST 2015
前:0/5,结果:Sat Oct 03 12:34:01 CST 2015
前:1/5,结果:Sat Oct 03 12:34:01 CST 2015
afterExecute:任务执行耗时:2008毫秒
afterExecute:任务执行耗时:2008毫秒
MyExecutor:beforeExecute任务 pool-1-thread-1 将开始执行,任务hashCode=765950271
MyExecutor:beforeExecute任务 pool-1-thread-2 将开始执行,任务hashCode=1278873915
afterExecute:任务结果:Sat Oct 03 12:34:03 CST 2015
前:2/5,结果:Sat Oct 03 12:34:03 CST 2015
afterExecute:任务执行耗时:2000毫秒
MyExecutor:beforeExecute任务 pool-1-thread-1 将开始执行,任务hashCode=1640129195
afterExecute:任务结果:Sat Oct 03 12:34:03 CST 2015
前:3/5,结果:Sat Oct 03 12:34:03 CST 2015
afterExecute:任务执行耗时:2001毫秒
MyExecutor:beforeExecute任务 pool-1-thread-2 将开始执行,任务hashCode=1208323172
afterExecute:任务结果:Sat Oct 03 12:34:05 CST 2015
前:4/5,结果:Sat Oct 03 12:34:05 CST 2015
afterExecute:任务执行耗时:2001毫秒
MyExecutor:shutdown关闭执行器开始
MyExecutor:beforeExecute任务 pool-1-thread-1 将开始执行,任务hashCode=1050548827
afterExecute:任务结果:Sat Oct 03 12:34:05 CST 2015
afterExecute:任务执行耗时:2001毫秒
MyExecutor:beforeExecute任务 pool-1-thread-2 将开始执行,任务hashCode=218569472
MyExecutor:shutdown已执行任务数量5
MyExecutor:shutdown正在执行的任务数量:2
MyExecutor:shutdown未执行的任务数量:2
********  获取后5个任务结果 **********
后:5/10,结果:Sat Oct 03 12:34:05 CST 2015
afterExecute:任务结果:Sat Oct 03 12:34:07 CST 2015
后:6/10,结果:Sat Oct 03 12:34:07 CST 2015
afterExecute:任务执行耗时:2001毫秒
MyExecutor:beforeExecute任务 pool-1-thread-1 将开始执行,任务hashCode=328181862
afterExecute:任务结果:Sat Oct 03 12:34:07 CST 2015
后:7/10,结果:Sat Oct 03 12:34:07 CST 2015
afterExecute:任务执行耗时:2001毫秒
MyExecutor:beforeExecute任务 pool-1-thread-2 将开始执行,任务hashCode=544180467
afterExecute:任务结果:Sat Oct 03 12:34:09 CST 2015
后:8/10,结果:Sat Oct 03 12:34:09 CST 2015
afterExecute:任务结果:Sat Oct 03 12:34:09 CST 2015
afterExecute:任务执行耗时:2000毫秒
afterExecute:任务执行耗时:2000毫秒
后:9/10,结果:Sat Oct 03 12:34:09 CST 2015
main:任务执行完成


7.3.实现基于优先级的Executor类

  Executor框架是在JDK1.5出现的,它的出现只需要实现任务并将它们传递到执行器中,然后执行器将负责创建执行任务的线程,并执行这些线程.

  执行器内部使用一个阻塞式队列存放等待执行的任务,并按任务到达执行时的顺序进行存放.另一个可行的替代方案是使用优先级对象存放新的任务,这样,如果有高优先级的新任务到达执行器,那么将会优先执行.

  本节将学习如何实现一个执行器.

实现优先级Executor类其实很简单:

先来看看ThreadPoolExecutor的构造.4个构造里面都有一个参数, BlockingQueue队列,我们在使用Executors使用创建线程池的时候,其实他们返回的线程池,也是通过 ThreadPoolExecutor构造传递不同的队列和不同的参数来构造的线程池实例.看下面的图:其中PriorityBlockingQueue是一个优先级队列,在 6.并发集合里面讲到过. 实际就是做到以下几点点就能实现优先级的Executor效果了



1. 使用 PriorityBlockingQueue 队列

2. 传递的任务.要实现Comparable接口,以便使用自己的优先级逻辑

3. 这里传入的队列 装的不是用来存储 我们传入的 runnable任务,而是用来存储执行器 把我们传入的逻辑任务再包装了一层的 执行器任务对象(在后面7.6章节中能看到)

其他信息

在并发集合中有一个比较有趣的DelayQueue队列,只返回到期的元素.可以使用ScheduledThreadPoolExecutor类定制自己的任务周期执行类.

示例

场景描述:

/**
* Created by zhuqiang on 2015/10/3 0003.
*/
public class Client {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
ArrayList<PriorityTask> list = new ArrayList<>();
for (int i = 0; i < 8; i++) {
pool.execute(new PriorityTask(i,"Task"+i));
}
pool.shutdown();
}
}

class PriorityTask implements Runnable, Comparable<PriorityTask> {
private int priority; //优先级
private String name; //名称

public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}

public int getPriority() {
return priority;
}

public void setPriority(int priority) {
this.priority = priority;
}

@Override
public int compareTo(PriorityTask o) {
return this.getPriority() < o.getPriority() ? 1 : this.getPriority() > o.getPriority() ? -1 : 0;
}

@Override
public void run() {
try {
System.out.printf("name=%s,priority=%s\n",name,priority);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


某一次运行结果

name=Task1,priority=1
name=Task0,priority=0
name=Task6,priority=6
name=Task7,priority=7
name=Task5,priority=5
name=Task4,priority=4
name=Task3,priority=3
name=Task2,priority=2


结果说明

由于PriorityBlockingQueue.poll() 会获取并移除队列中已有的元素的优先级最大的哪一个.所以在这里多添加了几个元素.能让队列有足够的时间存储更多的任务,然后在执行任务的时候,就能看到优先级高的被优先执行了

7.4.实现ThreadFactory接口生成定制线程

  工厂模式(Factory Pattern):在面向对象编程中是一个应用广泛的设计模式,它是一种创建模式(Creational Pattern),目标是创建一个类,并通过这个类创建一个或多个类的对象.当创建一个类的对象时,使用工厂类而不是new操作符.

工厂模式的好处:

对象创建集中化

改变对象的创建方式变得很容易

可以针对限定资源限制创建对象的数量

  例如:通过工厂模式生成了一个类型的N个对象,就很容易获得创建这些对象的统计数据.

  在Java中,Executor、Fork/Join 都使用了线程工厂来创建线程,Executors提供了大量的方法来创建不同类型的Executor对象.

本节将学习,继承Thread类,实现一些新的功能,并实现一个线程工厂来生成这个新的线程类对象.

总结

implements ThreadFactory 实现唯一个newThread方法,创建自定义线程工厂

Executors.defaultThreadFactory() 能获取一个最基本的线程工厂.它生成的线程对象都属于同一个线程租对象.

示例

场景描述:本示例实现的是: 计算任务的运行时间,使用定制线程工厂来创建统一的线程名称.

/**
* Created by zhuqiang on 2015/10/3 0003.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
MyThreadFactory mtf = new MyThreadFactory("定制线程");
Thread t1 = mtf.newThread(new MyTask());
Thread t2 = mtf.newThread(new MyTask());

t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(t1);
System.out.println(t2);
//还可以使用 线程池来使用我们定制的类
ExecutorService es = Executors.newCachedThreadPool(mtf);
MyTask task = new MyTask();
es.execute(task);
es.shutdown();
es.awaitTermination(1, TimeUnit.DAYS);
}
}

// 定制线程类,实现计算时间,和线程信息
class MyThread extends Thread{
private Date createDate; //线程创建时间
private Date startDate; //任务开始时间
private Date endDate; //任务结束时间

public MyThread(Runnable target,String name){
super(target,name);
createDate = new Date();
}
@Override
public void run() {
startDate = new Date();
super.run();
endDate = new Date();
}

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("createDate:").append(createDate).append(",")
.append("startDate:").append(startDate).append(",")
.append("endDate:").append(endDate).append(";")
.append("任务耗时:").append(endDate.getTime()-startDate.getTime()).append("毫秒");
return sb.toString();
}
}

//定制线程工厂类
// 实现 创建我们自定义的 线程类,和给该线程赋值自定义的线程名
class MyThreadFactory implements ThreadFactory{
private String prefix;
private int count;
public MyThreadFactory(String prefix) {
this.prefix = prefix;
}

@Override
public Thread newThread(Runnable r) {
MyThread myThread = new MyThread(r, prefix + "-" + count);
count++;
return myThread;
}
}

class MyTask implements Runnable{

@Override
public void run() {
long time = (long)(Math.random() * 5);
try {
System.out.printf("%s\n",Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


某一次运行结果

定制线程-1
定制线程-0
createDate:Sat Oct 03 15:33:25 CST 2015,startDate:Sat Oct 03 15:33:25 CST 2015,endDate:Sat Oct 03 15:33:26 CST 2015;任务耗时:1019毫秒
createDate:Sat Oct 03 15:33:25 CST 2015,startDate:Sat Oct 03 15:33:25 CST 2015,endDate:Sat Oct 03 15:33:27 CST 2015;任务耗时:2020毫秒
定制线程-2


结果说明

1. 使用简单粗暴的方式,创建线程并运行了任务.能看到线程的信息

2. 使用线程池没能获取到 线程对象.所以就没有打印出线程任务的运行信息

7.5.在Executor对象中使用ThreadFactory

没必要再写了,在实现ThreadFactory接口生成定制线程中已经演示了.

7.6.定制运行在定时线程池中的任务

定时线程池(Scheduled Thread Pool):是Executor框架基本线程池的扩展,允许一段时间后定时执行任务,ScheduledThreadPoolExecutor类不仅实现了这个功能,还运行执行以下两类任务:

延迟任务(Delayed Task):这类任务在一段时间后仅执行一次

周期性任务(Periodic Task):这类任务在一段延迟时间后周期性地执行.

定制线程须了解的知识

延迟任务:能够执行实现Callable和Runnable接口的两类对象,

周期任务:只能执行实现Runnable接口的对象.

所有定时线程池执行的任务(该任务 和 我通过执行器提交的任务 不是一个意思)都必须实现RunnableScheduledFuture接口

通过以上3点,我们要实现定制定时线程池中的任务的思路就是:

自定义线程池执行器Task,并实现 RunnableScheduledFuture接口(参考 默认的 ScheduledFutureTask 实现类,是一个私有的内部类)

想办法 让线程池执行器使用我们自己的 自定义线程池执行器Task

这一小节感觉太难了,光是根据api确实是没有办法写出来的.太佩服作者了,只有去阅读跟踪源代码,才能知道关键点,才能对症下药的偷梁换柱.

总结

通过以下的演示,延迟和周期的执行方式大概的了解点:

延迟执行,是一个基础

周期执行是在 延迟执行上,通过刷新间隔时间,再次把执行器task添加到执行器中,来达到周期执行的效果

延迟执行和周期执行的实现其实也是一个Runnable,一个执行器task相当于是一个commond命令,用来包装我们提交的业务task.并内部管理执行器task来管理执行器的task生命周期.

示例

场景描述:以下示例讲述了:怎么让执行器使用我们定制的执行器task来运行我们提交的任务.

/**
* Created by zhuqiang on 2015/10/3 0003.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
MyScheduledThreadPoolExecutor pool = new MyScheduledThreadPoolExecutor(2);
pool.schedule(new Task(),1,TimeUnit.SECONDS);  //一秒后执行一个延迟任务
TimeUnit.SECONDS.sleep(3);
pool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); // 一秒后执行延迟任务,并且每隔3秒周期执行
TimeUnit.SECONDS.sleep(10);
pool.shutdown();

}
}

// 1 : 实现定制的 执行器task类,外部是不会接触到该类的
// 供 周期任务执行器 使用的 任务类(要注意的是,这个任务并不是 我们 要写的任务逻辑中执行的任务.是线程池需要的任务)
// 在周期执行器里面的任务都要实现 RunnableScheduledFuture 接口(RunnableScheduledFuture 继承了 RunnableFuture)
// 而 FutureTask 实现 了 RunnableFuture,因此 我们继承 FutureTask类,而不需要去实现他的一些方法. 参考 默认的 ScheduledFutureTask 实现类,是一个私有的内部类
class MyScheduledTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{

private RunnableScheduledFuture<V> task; // 执行器的原生task任务对象. 我们写该类就是为了包装它,里面包含了 客户端提交的 runnable逻辑任务对象
private ScheduledThreadPoolExecutor pool; //线程池,用来考虑 是否被关闭了,如果被关闭.那么周期任务就不应该再加入到池里面
private long startDate;  //到期执行时间
private long period; //周期间隔时间
public MyScheduledTask(Runnable runnable, V result,RunnableScheduledFuture<V> task,ScheduledThreadPoolExecutor pool) {
super(runnable, result);
this.task = task;
this.pool = pool;
}

//是否是周期性任务
@Override
public boolean isPeriodic() {
return task.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
if(isPeriodic()){ //如果是周期任务
if(startDate == 0){
return task.getDelay(unit);
}else {
Date now = new Date();
long delay = startDate - now.getTime();
return unit.convert(delay,TimeUnit.MILLISECONDS);
}

}else {
return task.getDelay(unit);
}
}

@Override
public int compareTo(Delayed o) {
return task.compareTo(o);
}

@Override
public void run() {
if(isPeriodic() && (!pool.isShutdown())){ //如果是周期性任务,并且执行器没有被关闭.则更新下一次执行任务的时间,再增加到线程池中
Date now = new Date();
startDate = now.getTime() + period;
pool.getQueue().add(this);
}
System.out.printf("Pre-MyScheduledTask:%s\n",new Date());
System.out.printf("MyScheduledTask:是否是周期性任务:%s\n",this.isPeriodic());
super.runAndReset();
System.out.printf("Post-MyScheduledTask:%s\n",new Date());
}

public void setPeriod(long period) {
this.period = period;
}
}

//2:定制 周期执行器,偷梁换柱,让执行器使用我们定制的执行器task类
class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor{

public MyScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}

//修改或替换用于执行 runnable 的任务。此方法可重写用于管理内部任务的具体类。默认实现返回给定任务。
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
MyScheduledTask<V> result = new MyScheduledTask<V>(runnable, null, task, this);
return  result;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// 父类会把 我们传入的 任务属性,处理成 执行器的 task ,再调用decorateTask方法,获取 执行器的task(而默认的decorateTask方法就是返回原生task)
// decorateTask 返回的task里面已经包含了 我们提交的runnable任务类,所以我们要 把这个执行器的task包装成我们自己的 MyScheduledTask 执行器任务类
ScheduledFuture<?> task = super.scheduleAtFixedRate(command, initialDelay, period, unit);
MyScheduledTask myTask = (MyScheduledTask) task;
// 通过 decorateTask  后(返回的其实是我们的 task任务了)强制转换成 我们定制的 task后,时间隔 需要转换成,task中实现的单位时间
myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period,unit));
return myTask;
}

}

//任务类
class Task implements Runnable{

@Override
public void run() {
System.out.println("任务开始");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务结束");
}
}


某一次运行结果

Pre-MyScheduledTask:Sat Oct 03 17:30:46 CST 2015
MyScheduledTask:是否是周期性任务:false
任务开始
任务结束
Post-MyScheduledTask:Sat Oct 03 17:30:48 CST 2015
Pre-MyScheduledTask:Sat Oct 03 17:30:49 CST 2015
MyScheduledTask:是否是周期性任务:true
任务开始
任务结束
Post-MyScheduledTask:Sat Oct 03 17:30:51 CST 2015
Pre-MyScheduledTask:Sat Oct 03 17:30:52 CST 2015
MyScheduledTask:是否是周期性任务:true
任务开始
任务结束
Post-MyScheduledTask:Sat Oct 03 17:30:54 CST 2015
Pre-MyScheduledTask:Sat Oct 03 17:30:55 CST 2015
MyScheduledTask:是否是周期性任务:true
任务开始
任务结束
Post-MyScheduledTask:Sat Oct 03 17:30:57 CST 2015


7.7.通过实现ThreadFactory接口为Fork/Join框架生成定制线程

  Java7中最有趣的特性之一就是Fork/Join框架,它是Executor和ExecutorService接口的实现,允许我们执行Callable 和 Runnable任务.而不需要去关注执行这些任务的具体线程.

这个执行器用于执行任务可以拆分成更小任务体的任务,它的主要组件如下:

一种特殊类型任务,由ForkJoinTask类来实现.

两种操作,fork将每一个任务拆分为多个子任务,join操作等待这些子任务结束.

工作窃取算法(work-Stealing Algorithm):用来对线程池的使用进行优化,当一个任务等待它的子任务时,执行这个任务的线程可以被用来执行其他任务.

Fork/Join框架的主要类是ForkJoinPool类,从内部实现来说,它有下面两个元素:

一个任务队列,存放的是等待被执行的任务

一个执行这些任务的线程池.

本小节将学习:如何实现一个定制的工作线程,他被ForkJoinPool类使用.此外还将学习如何通过工厂模式来使用它.

总结

实现定制的工作线程需要两个步骤:

extends ForkJoinWorkerThread 编写自定义的工作线程

使用自定义的工作线程,需要自定义的工作线程工厂来创建implements ForkJoinPool.ForkJoinWorkerThreadFactory

示例

场景描述:

/**
* Created by zhuqiang on 2015/10/3 0003.
*/
public class Client {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 并行工作线程数量,线程工厂类,处理线程内部出现错误的时候的处理类,默认null,最后一个参数:我不知道.默认值为false
ForkJoinPool pool = new ForkJoinPool(4, new MyWorkerThreadFactory(), null, false);

int[] arr = new int[100];
for (int i = 0; i < arr.length; i++) {
arr[i] = 1;
}
MyRecursiveTask task = new MyRecursiveTask(arr, 0, arr.length);
pool.execute(task);

pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("结果:" + task.get());
}
}

/**
* api说:该类是用来在ForkJoinPool中管理的,如果需要使用此类的实现,还需要提供一个自定义ForkJoinPool.ForkJoinWorkerThreadFactory ForkJoinPool使用它。
*/
class MyWorkerThread extends ForkJoinWorkerThread{

private static ThreadLocal<Integer> taskCounter = new ThreadLocal<>(); //计数器
protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}

//工作线程开始初始化
@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread初始化id=:%s\n", getId());
taskCounter.set(0);
}

//工作线程完成,结束: 每当一个工作线程正常结束 或则 异常 则会调用该方法,exception有值则标识抛出了异常,没有值,则表示正常结束
@Override
protected void onTermination(Throwable exception) {
super.onTermination(exception);
System.out.printf("MyWorkerThread结束id=:%s,计数器:%s\n", getId(), taskCounter.get());
}
// 增加任务,就把计数器加1
public void addTask(){
int i = taskCounter.get().intValue();
i++;
taskCounter.set(i);
}

public int getTaskCount(){
return taskCounter.get();
}
}

// 工作线程工厂
class MyWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory{

@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}
}

class MyRecursiveTask extends RecursiveTask<Integer>{
private int[] arr; // 资源
private int start,end; //起始索引

public MyRecursiveTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
Integer result = 0;
MyWorkerThread myWorkerThread = (MyWorkerThread) Thread.currentThread();

if(end -start < 20){
result = hander();
myWorkerThread.addTask();
System.out.printf("id=%s,start=%s,end=%s\n",myWorkerThread.getId(),start,end);
}else{ //拆分为更多子任务
int mid =  (start + end) / 2;
MyRecursiveTask mt1 = new MyRecursiveTask(arr, start, mid);
MyRecursiveTask mt2 = new MyRecursiveTask(arr, mid, end);
invokeAll(mt1,mt2);
try {
result = mt1.get() + mt2.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

return result;
}

private Integer hander(){
Integer result = 0;
for (int i = start; i < end; i++) {
result+=arr[i];
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}


某一次运行结果

MyWorkerThread初始化id=:14
MyWorkerThread初始化id=:15
MyWorkerThread初始化id=:16
MyWorkerThread初始化id=:17
id=16,start=25,end=37
id=15,start=50,end=62
id=14,start=0,end=12
id=17,start=75,end=87
Disconnected from the target VM, address: '127.0.0.1:7283', transport: 'socket'
id=16,start=37,end=50
id=14,start=12,end=25
id=15,start=62,end=75
id=17,start=87,end=100
MyWorkerThread结束id=:15,计数器:2
MyWorkerThread结束id=:17,计数器:2
MyWorkerThread结束id=:16,计数器:2
MyWorkerThread结束id=:14,计数器:2
结果:100


结果说明

1. 结果中出现的ip信息,不用理会,是idea的运行信息.

2. 从结果可以看到,有4个工作线程,和我构造传入的4相符合

3. 每个线程的计数器,是用ThreadLocal来保持的,他们相加 等于 8.也就是说任务被拆分成了8个任务来执行,从打印出来的 每个任务负责的索引区域.也能看出来是8个任务

7.8.定制运行在Fork/Join框架中的任务

  Executor框架分离了任务的创建和执行,在这个框架下只要实现Runnable和Executor对象,并将Runnable对象发送给执行器即可,这样,执行器创建执行这些任务线程,并对其进行管理直到线程终止.

  Java7中Fork/Join框架提供一种特殊形式的执行器,把一个任务拆分成若干个子任务,并还提供了工作窃取算法来提高性能.

Fork/join框架的主类是ForkJoinPool类.从内部来讲,它有下面两个元素:

一个任务队列,存放的是等待被执行的任务

一个执行这些任务的线程池

默认情况下,ForkJoinPool类执行的任务是ForkJoinTask类的对象,我们也可以传递Runnable和Callable对象进去,但是他们不会李勇Fork/Join框架的优势,一般将ForkJoinTask的两种子类传递进去:

RecursiveAction : 用于任务不返回结果的情况

RecursiveTask : 用于任务返回结果的情况

  在本节我们将学习,继承ForkJoinTask类来实现我们定制的任务.我们可以在来统计任务的运行时间,输出任务日志,获取任务中使用到的资源等等操作.

实现自定义工作任务的方法就是继承 ForkJoinTask,从而 需要按规定实现以下几个方法:

getRawResult : 用来获取任务的结果,当任务不返回任何结果时,方法必须返回null

setRawResult : 用来设置任务的结果,当任务不返回任何结果时,方法为空

exec : 实现任务的逻辑.像RecursiveTask和RecursiveAction都自定义了抽象方法compute供子类实现,那么我们也可以仿照,在这个方法里面委托compute方法.从而实现了一层代理.就能在这些方法中 实现我们自定义的一些功能.

示例

场景描述:只是演示了,使用怎么自定义自己的workTask,使用自定义的workTask来实现之前实现过的示例,把一个数组中的数值全都+1,这里实现的是 不返回结果的 task.如果要返回结果的.同样的可以参考RecursiveTask 类

/**
* Created by zhuqiang on 2015/10/7 0007.
*/
public class Client {
public static void main(String[] args) {
int[] arr = new int[100];
ForkJoinPool poll = new ForkJoinPool();
Task t = new Task("我的任务", arr, 0, arr.length);
poll.execute(t);

t.join();

// 检测任务中 不为0的元素
for (int i = 0; i < arr.length; i++) {
if(arr[i] == 0 ){
System.out.printf("任务处理错误的元素:%s,索引:%s\n",arr[i],i);
}
}

poll.shutdown();
System.out.println("Main:结束");
}
}

/**
* 实现 不返回任务结果的自定义 任务类,参考 ForkJoinTask 类的实现
*/
abstract class MyWorkTask extends ForkJoinTask<Void>{
private String name;  //任务名称

public MyWorkTask(String name) {
this.name = name;
}

// 当任务不返回任何结果的时候,必须返回null
@Override
public Void getRawResult() {
return null;
}

//当任务不返回任何结果时,方法体为空
@Override
protected void setRawResult(Void value) {

}

// 主方法类,
@Override
protected boolean exec() {
Date startTime = new Date();
compute();  //可以看到 我们委托了自己的 compute方法,这里相当于代理了一层.就能在前后处理一些事情
Date endTime = new Date();
System.out.printf("MyWorkTask:%s,任务耗时%s毫秒.\n",name,endTime.getTime()-startTime.getTime());
return true;
}

/** 任务逻辑方法*/
protected abstract void compute();

public String getName() {
return name;
}
}

// 声明一个任务类,继承我们自定义的work任务
class Task extends MyWorkTask{
private int[] arr;
private int start;
private int end;

public Task(String name, int[] arr, int start, int end) {
super(name);
this.arr = arr;
this.start = start;
this.end = end;
}

// 只是把这个数组里面的每个数字 都自增了1
@Override
protected void compute() {
if(end - start > 40){
int mid = (end + start)/2;
Task t1 = new Task(this.getName() + "1", arr, start, mid);
Task t2 = new Task(this.getName() + "2", arr, mid, end);

invokeAll(t1,t2);
}else {
for (int i = start; i < end; i++) {
arr[i]++;
}
System.out.printf("name=%s,start=%s,end=%s\n",this.getName(),start,end);
}
}
}


某一次运行结果

name=我的任务11,start=0,end=25
name=我的任务22,start=75,end=100
name=我的任务21,start=50,end=75
name=我的任务12,start=25,end=50
MyWorkTask:我的任务22,任务耗时18毫秒.
MyWorkTask:我的任务21,任务耗时18毫秒.
MyWorkTask:我的任务11,任务耗时19毫秒.
MyWorkTask:我的任务2,任务耗时19毫秒.
MyWorkTask:我的任务12,任务耗时18毫秒.
MyWorkTask:我的任务1,任务耗时20毫秒.
MyWorkTask:我的任务,任务耗时20毫秒.
Main:结束


结果说明

之前其实一直不是很直观的了解他的任务是怎么运行的一个机制,现在可以看出来,递归创建的任务.都是底层的任务在运行,可以看到 实际运算的任务是 最底层的2位数的任务名称, 您可以尝试把任务分得更小,就能看明白了.

这里也能想到 工作窃取算法的一点原理了. 既然有这么多的上层任务都在等待子任务的执行,那么就完全有可能利用这些工作线程去执行实际的任务.然而就能避免创建更多的线程.从而提高了性能.

7.9.实现定制Lock类

  锁是Java并发API提供的最基本的同步机制之一,它用来保护代码的临界区(Critical Section),所以同一时间只有一个线程能执行临界区代码,它提供了以下两种操作:

lock: 当要访问临界区代码时调用这个操作.如果另一个线程正在运行临界区代码,其他线程被阻塞直到被访问临界区的锁唤醒.

unlock:在临界区代码结尾调用这个操作,以运行其他线程来访问这部分临界区代码.

例如ReentrantLock就是一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。在本节将学习如何自定义Lock对象.

AbstractQueuedSynchronizer:用来实现带有锁或信号特性的同步机制,提供操作来对临界区代码的访问进行控制,并对等待访问临界区代码的阻塞线程队列进行管理.

示例

场景描述: 这个示例本人说不出什么东西,因为 在MyLock中,实现的所有的方法,为什么要调用aqs中的对应的方法,完全不知道是为什么.另外AbstractQueuedLongSynchronizer也是一个同步机制,只是使用了long属性来报错线程的状态.

/**
* Created by zhuqiang on 2015/10/7 0007.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
for (int i = 0; i < 5; i++) {
new Thread(new Task(lock,"Task"+i)).start();
}

//尝试获取锁,如果获取不到就等待1秒后继续获取
boolean flag = false;
do{
flag = lock.tryLock();
if(!flag){
System.out.println("未获取到锁");
TimeUnit.SECONDS.sleep(1);
}
}while (!flag);

System.out.println("Main 结束");
lock.unlock();
}
}

class MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer{
private AtomicInteger state = new AtomicInteger(0);

// 当访问临界区的代码时,调用这个方法.如果访问成功,返回true
@Override
protected boolean tryAcquire(int arg) {
//        getState();  //也可以使用 该方法来 获取锁的状态
return state.compareAndSet(0,1);  //改变状态,利用原子变量来 操作,当 锁未被获取的时候,才返回true
}

// 释放对临界区代码的访问,调用这个方法.如果释放成功,返回true
@Override
protected boolean tryRelease(int arg) {
//        setState(0); 设置 锁的状态
return state.compareAndSet(1,0); //改变状态
}
}


某一次运行结果

Thread-0,name=Task0
未获取到锁
未获取到锁
Thread-0,name=Task0,休眠了两秒
Thread-1,name=Task1
未获取到锁
未获取到锁
Thread-1,name=Task1,休眠了两秒
Thread-2,name=Task2
未获取到锁
未获取到锁
Thread-2,name=Task2,休眠了两秒
未获取到锁
Thread-3,name=Task3
未获取到锁
Thread-3,name=Task3,休眠了两秒
Thread-4,name=Task4
未获取到锁
未获取到锁
未获取到锁
Thread-4,name=Task4,休眠了两秒
Main 结束


7.10.实现基于优先级的传输队列

在Java7中提供了几种用于并发应用程序的数据结构.我们要重点关注以下几种:

LinkedTransferQueue: 一个基于链表的无界队列,采用先进先出原则. 适用于拥有生产者-消费者结构的场景中. 提供并发先关的一些操作,例如:当队列为空时,可以阻塞直到有可用的元素为止.

PriorityBlockingQueue:一个无界的阻塞队列,元素按顺序存储(不可有null),这些元素必须实现Comparable接口,并实现接口中的compareTo()方法

本节将学习如何实现一个数据接口,用来解决数据结构中的元素是按优先级排序的生产者/消费者问题,优先级高的先被处理.

示例

打算放弃学习这一小节了,感觉太难了.照葫芦画瓢也不明白为什么要这样做,主要是这个多线程之间的机制搞不明白.很多东西就没法理解.

7.11.实现自己的原子对象

 从Java5开始就引入了原子变量(Atomic Variable),它tugi不过对于对单个变量的原子操作.当线程使用原子变量执行操作时,类的实现包括了一个机制来检查是否在但不内结束.简单来讲,就是操作获取变量值,然后通过本地变量来改变值,接着尝试改旧值为新值,如果旧值未变,则执行改变,否则,方法重新执行.

本小节将学习如何继承一个原子对象和如何实现两个遵守原子对象机制保证所有操作在但不内结束的方法. 

示例

场景描述: 使用原子变量 来模拟一个停车场,把车位的占用和空出作为两个原子操作.

carIn: 与停车场现有汽车数量与最大停车场数想比较,如果相等,则车位满了.返回flase,否则,使用下面的原子操作:

将原子对象的值赋值给一个本地变量

将本地变量值增加1 作为新值,并把这个值赋给另一个不同的变量.

使用cas方法尝试使用新值替换旧值,如果返回true,作为参数的旧值就是当前计数器内的值,所以计数器值将发生改变 ,如果返回false,则作为参数的旧值已不是当前内部计数器的值(另一个线程已修改过它),所以这个操作就不是以原子操作方式执行的.操作将重新开始.

原来如此,以前一直不知道源码中 类似这种for循环的方式组合cas函数的操作是什么意思.现在终于明白了.就是一直尝试把这个操作作为原子操作.

/**
* Created by zhuqiang on 2015/10/7 0007.
*/
public class Clinet {
public static void main(String[] args) throws InterruptedException {
ParkingCounter pc = new ParkingCounter(2);
Thread t1 = new Thread(new Task1(pc));
Thread t2 = new Thread(new Task2(pc));

t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("Main:" + pc.get());
}
}

class ParkingCounter extends AtomicInteger{
private int maxNumber; //存储停车场中最大可停放汽车的数量

public ParkingCounter(int maxNumber) {
//        set(0);// 设置初始值为0
this.maxNumber = maxNumber;
}

/**
* 车位被占
* 如果停车场未满,则改变计数器的值: 使用cas方式来改变,如果修改不成功则一直尝试.
* */
public boolean carIn(){
for(;;){
int value = get();
if(value == maxNumber){
System.out.printf("%s,停车场已满,%s/%s\n",Thread.currentThread().getName(),value,maxNumber);
return false;
}else{
int newValue = value + 1;
boolean changed = compareAndSet(value, newValue);
if(changed){
System.out.printf("%s,in,%s/%s\n",Thread.currentThread().getName(),newValue,maxNumber);
return true;
}
}
}
}

/**
* 车辆开出
* @return
*/
public boolean carOut(){
for(;;){
int value = get();
if(value == 0){
System.out.printf("%s,停车场中暂无车辆,%s/%s\n",Thread.currentThread().getName(),value,maxNumber);
return false;
}else{
int newValue = value - 1;
boolean changed = compareAndSet(value, newValue);
if(changed){
System.out.printf("%s,out,%s/%s\n",Thread.currentThread().getName(),newValue,maxNumber);
return true;
}
}
}
}
}

class Task1 implements  Runnable{
private ParkingCounter pc;

public Task1(ParkingCounter pc) {
this.pc = pc;
}

@Override
public void run() {
pc.carIn();
pc.carOut();
pc.carIn();
pc.carIn();
pc.carOut();
pc.carIn();
pc.carOut();
}
}

class Task2 implements  Runnable{
private ParkingCounter pc;

public Task2(ParkingCounter pc) {
this.pc = pc;
}

@Override
public void run() {
pc.carOut();
pc.carOut();
pc.carIn();
pc.carIn();
pc.carOut();
}
}


某一次运行结果

Thread-0,in,1/2
Thread-1,停车场中暂无车辆,0/2
Thread-1,停车场中暂无车辆,0/2
Thread-0,out,0/2
Thread-1,in,1/2
Thread-0,in,2/2
Thread-0,停车场已满,2/2
Thread-0,out,1/2
Thread-1,停车场已满,2/2
Thread-0,in,2/2
Thread-0,out,0/2
Thread-1,out,1/2
Main:0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: