您的位置:首页 > 编程语言 > Java开发

第21章 并发 ——《Thinking in Java》学习笔记

2014-08-12 19:40 375 查看
说道并发,就有种异常高大上的感觉,好像比那些顺序编程有种不可明辨的优越感一样,是的,并发的确更深奥,而且现在全世界都离不开并发程序。

但是什么是并发?并发能做什么?并发如何实现?

---------------------------------------------------------------------------------------------------------------------

并发执行最明显的特征就是使程序的执行速度获得极大的提高,或者是更易于解决问题(博弈,仿真,DP?)

当并行执行的任务彼此开始互相干涉时,实际的并发问题就会接踵而至,并发“具有可论证(能蒙出来的,按照逻辑就应该是这样的)的确定性,但是实际上具有不可确定性(例如突然死锁)。”了解并发能够让我意识到明显正确的程序可能会展示出不正确的行为。

一、并发的多面性

用并发解决的问题大体上分为【速度】 【设计可管理性】两种

1、更快地执行,主要是应对在【单处理器】上程序的性能?(雾?)

2、改进代码设计,例如说仿真和协作多线程。

二、基本的线程机制

并发编程可以使程序划分为多个分离的,独立运行的任务,这些独立任务每一个都将有执行线程来驱动,一个线程就是在线程中的一个单一的顺序控制流。

因此,单个线程(THREAD)可以拥有多个并发执行的任务(TASK),但是你的程序使得每个任务都好像有其自己的CPU一样,底层是切分CPU时钟周期。

所以线程机制是一种建立在透明的,
4000
可拓展程序的方法,如果程序运行的太慢,为机器增添一个CPU就很容易加快程序的运行速度。

1、定义任务

下面是一个典型的实现Runnable的任务。

/**
* @describe 定义多线程任务
* @author Hope6537
* @signdate 2014年7月25日下午6:28:29
*/
public class LiftOff implements Runnable {

protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;

public LiftOff() {

}

public LiftOff(int countDown) {
super();
this.countDown = countDown;
}

public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff!") + ")";
}

public void run() {
while (countDown-- > 0) {
System.out.println(status());
Thread.yield();//线程调度器 表示我已经执行完最主要的部分了,可以将控制权移交给别的线程
}
};

}


而经典的驱动任务启动的方法有两种,一种是直接获取Runnable的派生对象,执行Run方法(在Main里),而另一种方法如下,定义Thread对象来进行驱动

public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
/* Output:
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*/
在输出中我们看到Waiting for LiftOff 先出现在控制台上,start()迅速的返回了,从而使程序流程接着运行下去

此时main()和LiftOff.run()方法是同时执行的,所以很容易看出来他们之间的联系

public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
} /* Output: (Sample)
Waiting for LiftOff
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6),
#1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5),
#4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2),
#1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1),
#0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~


输出说明不同的任务的执行在线程换进换出的时候混到了一起,这是源于线程调度器自动控制的。他会默默的分发线程。

当主函数创建了Thread对象时,他并没有捕获任何对这些对象(Runnable)的引用,每个Thread都注册了自己,而且在他的任务run方法完成之前,垃圾回收器将无法回收他,因此一个线程会创建单独的一个执行线程,对start的调用完成后,它依旧存在。

2、使用 Executor(执行器)

我一直觉得,这东西吧,就像个黑科技一样,这玩意学名叫执行器(为啥我喜欢管它叫线程池),为我们管理Thread对象,从而简化了(?艹)并发编程。Executor在客户端和任务执行之间提供了一个间接层,它将执行任务。下面是它的具体用法。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @describe Executor执行器的使用 首选
* @author Hope6537
* @signdate 2014年7月25日下午6:54:36
*/
public class CachedThreadPool {

public static void main(String[] args) {
// 定义一个执行器
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
// 对shutdown的调用是用于防止新的任务提交给他
exec.shutdown();
}
}

// 但是要注意在线程池中 现有线程在有可能的情况下,都会被自动复用

/**
* @describe 有限个数线程池 上面的出现问题采用它
* @author Hope6537
* @signdate 2014年7月25日下午7:17:03
*/
class FixedThreadPool {
// 使用了有限的线程集来提交任务
public static void main(String[] args) {
// 定义个数
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
// 通过池中获取固定线程
exec.execute(new LiftOff());
}
exec.shutdown();
}
}

/**
* @describe 单一线程池 用于执行连续运行的事务
* @author Hope6537
* @signdate 2014年7月25日下午7:17:53
*/
class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
// 通过池中获取固定线程 它内部有一个悬挂的任务队列
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
shutDown的调用并不是说立刻停止当前的所有任务,而是堵住入口,不让新的任务进来,当现有任务完成之后,exec就会关闭。

我们还有一种实现,可以限制执行器内的任务数量FixedThreadPool 。上面都写了。

而SingleThreadExecutor 是容量仅为1的执行器,对于长期存活的任务来说是很有用的。如果有多个任务同时在SingleThreadExecutor中,他们将会排队等待执行。

3、从任务中产生返回值

Runnable是个独立执行的玩意。而且你看看啊 public void run(){} 什么值也不返回。但是有种变种就是Callable,读起来像是回调函数。它的类型参数表示的从call中返回的值。并且……额……限制挺多的。使用方法如下

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @describe 使用Callable作为线程载体
* @author Hope6537(赵鹏)
* @signdate 2014年7月25日下午7:32:26
*/
public class CallableDemo {

public static void main(String[] args) {
// 声明一个连接池
ExecutorService exec = Executors.newCachedThreadPool();
// Future用Callable返回结果的特定类型进行了参数化
// 可以使用isDone方法查询Future是否已经完成 当任务完成后 会产生一个可以用get()接收到的一个结果
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) {
results.add(exec.submit(new TaskResult(i)));
}
for (Future<String> fs : results) {
try {
System.out.println(fs.get());
} catch (Exception e) {
System.out.println(e);
return;
} finally {
exec.shutdown();
}
}
}

}

class TaskResult implements Callable<String> {

private int id;

public TaskResult(int id) {
super();
this.id = id;
}

// 从任务中产生返回值
@Override
public String call() throws Exception {
return "result of TaskWithResult" + id;
}

}

/*Output:
* result of TaskWithResult0
result of TaskWithResult1
result of TaskWithResult2
result of TaskWithResult3
result of TaskWithResult4
result of TaskWithResult5
result of TaskWithResult6
result of TaskWithResult7
result of TaskWithResult8
result of TaskWithResult9
*/
submit方法会产生Future对象(这又是啥?),他用Callable返回结果的特定类型进行了参数化,可以用isDone方法来查询Future所附带的任务是否完成。完成后可以调用get()来获取结果,如果未完成,那么get()将阻塞,直到结果就绪。

4、休眠

要是在以前,我这急性子就立马Thread.sleep(123);了,但是Bruce给出了一种看起来更高大上的方法

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SleepingTask extends LiftOff {

@Override
public void run() {
try {
while (countDown-- > 0) {
System.out.println(status());
// 老版休眠
// Thread.sleep(100);
// 正是这段休眠的事件所以使得事件能够有序的进行
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
// 异常无法跨线程传递 所以需要各个单位自己解决
System.out.println("Crashed!");
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new SleepingTask());
}
exec.shutdown();
}
}
/*
* #0(9) #2(9) #4(9) #3(9) #1(9) #4(8) #1(8) #3(8) #2(8) #0(8) #3(7) #0(7) #4(7)
* #1(7) #2(7) #3(6) #4(6) #2(6) #1(6) #0(6) #2(5) #3(5) #0(5) #4(5) #1(5) #0(4)
* #4(4) #3(4) #1(4) #2(4) #2(3) #4(3) #1(3) #3(3) #0(3) #3(2) #0(2) #2(2) #4(2)
* #1(2) #0(1) #3(1) #1(1) #4(1) #2(1) #4(LiftOff!) #3(LiftOff!) #1(LiftOff!)
* #2(LiftOff!) #0(LiftOff!)
*/
我们可以看到倒计时都是有序进行的,每个任务在读秒后都会阻塞,然后给其他的任务时间,这样就达成了貌似完美的平衡。

5、优先级

线程的优先级将该线程的重要性传递给了调度器,例如说X二代就比我们这些屁民有优先级是吧。各个机构的小本本都记着呢~调度器也是默默记着呢。调度器将会让优先级的任务多执行,而的任务少执行不会造成死锁这样的情况的。

在大多数情况下,操纵线程优先级都是一种错误,现实也是……下面是个例程演示

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @describe 線程優先級實驗
* @author Hope6537
* @signdate 2014年7月26日上午9:51:30
*/
public class SimplePriorities implements Runnable {
private static int countDown = 5;
private volatile double d;
private int priority;

public SimplePriorities(int priority) {
super();
this.priority = priority;
}

@Override
public String toString() {
return Thread.currentThread() + ":" + countDown;
}

@Override
public void run() {
// 优先级在线程执行时被加载 不在构造方法
Thread.currentThread().setPriority(priority);
while (true) {
for (int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double) i;
if (i % 1000 == 0) {
Thread.yield();
}
}
System.out.println(this);
if (--countDown == 0) {
return;
}
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
// 最好只使用这两种
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}

/*
* Thread[pool-1-thread-4,1,main]:-22134 Thread[pool-1-thread-1,1,main]:-22135
* Thread[pool-1-thread-2,1,main]:-22136 Thread[pool-1-thread-5,1,main]:-22137
* Thread[pool-1-thread-3,1,main]:-22138 Thread[pool-1-thread-4,1,main]:-22139
* Thread[pool-1-thread-1,1,main]:-22140 Thread[pool-1-thread-3,1,main]:-22141
* Thread[pool-1-thread-5,1,main]:-22142 Thread[pool-1-thread-4,1,main]:-22143
* Thread[pool-1-thread-2,1,main]:-22144 Thread[pool-1-thread-1,1,main]:-22145
* Thread[pool-1-thread-3,1,main]:-22146 Thread[pool-1-thread-4,1,main]:-22147
* Thread[pool-1-thread-5,1,main]:-22148 Thread[pool-1-thread-2,1,main]:-22149
* Thread[pool-1-thread-1,1,main]:-22150 Thread[pool-1-thread-3,1,main]:-22151
* Thread[pool-1-thread-2,1,main]:-22152 Thread[pool-1-thread-4,1,main]:-22153
* Thread[pool-1-thread-1,1,main]:-22154 Thread[pool-1-thread-3,1,main]:-22155
* Thread[pool-1-thread-5,1,main]:-22156 Thread[pool-1-thread-2,1,main]:-22157
* Thread[pool-1-thread-1,1,main]:-22158 Thread[pool-1-thread-3,1,main]:-22159
* Thread[pool-1-thread-5,1,main]:-22160 Thread[pool-1-thread-4,1,main]:-22161
* Thread[pool-1-thread-2,1,main]:-22162 Thread[pool-1-thread-5,1,main]:-22163
* Thread[pool-1-thread-4,1,main]:-22164 Thread[pool-1-thread-1,1,main]:-22165
* Thread[pool-1-thread-3,1,main]:-22166 Thread[pool-1-thread-2,1,main]:-22167
*/
我们可以看到,设定优先级的地方是在run方法的开头部分设定的。不要设置在构造器里

6、让步

Thread,yield()告诉线程调度器,我的活完事了!或者说不着急用,那么具有相同优先级的其他线程就可以运行了。

但是在大体上,对于任何重要的控制或在调整应用时,都不能依赖让步,yield经常被误用。

7、后台线程

后台线程(Daemon)并不是是非常重要的线程,当所有的非后台线程结束时,程序也就终止了,同时终止所有的后台线程。或者说,只要有非后台线程存在,程序就不会终止。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.TimeUnit;

/**
* @describe 后台线程实验
* @author Hope6537
* @signdate 2014年7月26日上午9:58:15

*/
public class SimpleDaemons implements Runnable {
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " : " + this);
}
} catch (Exception e) {
System.out.println("Sleeping");
}
}

public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true);
daemon.start();
}
System.out.println("All Daemons Started");
TimeUnit.MILLISECONDS.sleep(200);
// 一旦睡眠时间结束,主线程终止,那么后台线程也将被迫终止
}
}
/*
* All Daemons Started Thread[Thread-2,5,main] :
* org.hope6537.thinking_in_java.twenty_one.SimpleDaemons@33b7b32c
* Thread[Thread-1,5,main] :
* org.hope6537.thinking_in_java.twenty_one.SimpleDaemons@6154283a
* Thread[Thread-5,5,main] :
* org.hope6537.thinking_in_java.twenty_one.SimpleDaemons@6154283a
* Thread[Thread-8,5,main] :
* org.hope6537.thinking_in_java.twenty_one.SimpleDaemons@64726693
* Thread[Thread-3,5,main] :
.....
*/
它创造了显式的线程,以便可以设置他们的后台标志。通过使用工厂模式,可以使Executor运行该线程。
import java.util.concurrent.ThreadFactory;

/**
* @describe 后台线程工厂 可以传递给执行器来执行
* @author Hope6537
* @signdate 2014年7月26日上
19f08
午10:58:45
*/
public class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
之后

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @describe 后台线程工厂驱动实例
* @author Hope6537
* @signdate 2014年7月26日上午11:03:27
*/
public class DaemonFromFactory implements Runnable {
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread() + ":" + this);
}
} catch (Exception e) {
System.out.println("Out");
}
}

public static void main(String[] args) throws InterruptedException {
// 声明由工厂模式构建出来的执行器
ExecutorService exec = Executors
.newCachedThreadPool(new DaemonThreadFactory());
for (int i = 0; i < 10; i++) {
exec.execute(new DaemonFromFactory());
}
System.out.println("All Daemons are started");
TimeUnit.MILLISECONDS.sleep(2000);
// 然后依旧遵循后台线程定理
}
}
我们可以通过isDaemon()方法确认是否为后台线程
同样的。如果一个后台线程创建新线程,那么它创建的所有线程都是后台线程。

8、编码的变体

0、继承自Thread类的创建线程 略……

1、自管理的Runnable

public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this);
//实际上应当在Executor中执行任务,而不是在构造方法内
public SelfManaged() { t.start(); }
public String toString() {
return Thread.currentThread().getName() +
"(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
//出现条件时,终止任务
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SelfManaged();
}
} /* Output:
Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4), Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2), Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5), Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1),
*///:~


2、还有会玩的熊孩子用内部类隐藏线程代码

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;

/**
* @describe 第一种包装方法 直接继承Thread类,但是这样会存在局限性 无法多重继承
* @author Hope6537
* @signdate 2014年7月26日下午12:53:37
*/
class InnerThread1 {
private int countDown = 5;
private Inner inner;

private class Inner extends Thread {
Inner(String name) {
super(name);
start();//直接调用Start
}

public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0)
return;
sleep(10);
}
} catch (InterruptedException e) {
System.out.println("interrupted");
}
}

public String toString() {
return getName() + ": " + countDown;
}
}
//	在构造方法中声明inner线程类 在inner内直接就会执行start
public InnerThread1(String name) {
inner = new Inner(name);
}
}

/**
* @describe 声明线程对象t 然后在内部进行方法实现,手动调用start启动线程
* @author Hope6537
* @signdate 2014年7月26日下午12:54:12
*/
class InnerThread2 {
private int countDown = 5;
private Thread t;

public InnerThread2(String name) {
t = new Thread(name) {
public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0)
return;
sleep(10);
}
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
}

public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}

/**
* @describe 内部类实现Runnable接口 然后还是在类中生成线程对象再调用start启动
* @author Hope6537
* @signdate 2014年7月26日下午12:54:41
*/
class InnerRunnable1 {
private int countDown = 5;
private Inner inner;

private class Inner implements Runnable {
Thread t;

Inner(String name) {
t = new Thread(this, name);
t.start();
}

public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0)
return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
}

public String toString() {
return t.getName() + ": " + countDown;
}
}

public InnerRunnable1(String name) {
inner = new Inner(name);
}
}

/**
* @describe t直接声明Thread Runnable实现
* @author Hope6537
* @signdate 2014年7月26日下午12:55:18
*/
class InnerRunnable2 {
private int countDown = 5;
private Thread t;

public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0)
return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
}

public String toString() {
return Thread.currentThread().getName() + ": " + countDown;
}
}, name);
t.start();
}
}

/**
* @describe 将线程封装进方法体内
* @author Hope6537
* @signdate 2014年7月26日下午12:55:36
*/
class ThreadMethod {
private int countDown = 5;
private Thread t;
private String name;

public ThreadMethod(String name) {
this.name = name;
}

/**
* @descirbe 该方法线程执行之前返回
*/
public void runTask() {
if (t == null) {
t = new Thread(name) {
public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0)
return;
sleep(10);
}
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
}

public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}

/**
* @describe 线程的多种包装方法
* @author Hope6537
* @signdate 2014年7月26日下午12:53:21
*/
public class ThreadVariations {
public static void main(String[] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runTask();
}
}
注释都详细说明了……在这就不赘述了。

9、术语

Bruce老师扯道:将要执行的工作时使用“任务(TASK)”,而在引用到驱动任务的具体机制是使用线程(THREAD)

10、加入一个线程

有点像计算机组成原理中的片选和屏蔽。一个线程可以在其他的线程之上调用join()方法,其效果是等待一段时间,直到第二个线程结束(被调用join的线程)才继续执行。

如果一个线程在另一个线程t上调用t.join(),此线程(就是调用线程)将被挂起,直到目标(被调用线程)线程t结束才恢复。

可以加一个时间参数,用来定时返回。而join方法也可以被中断。做法是调用interrupt方法,下面是例程

package org.hope6537.thinking_in_java.twenty_one;

class Sleeper extends Thread {
private int duration;

public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}

public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
System.out.println(getName() + " was interrupted. "
+ "isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}

}

class Joiner extends Thread {
private Sleeper sleeper;

public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}

public void run() {
try {
sleeper.join();
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println(getName() + " join completed");
}
}

/**
* @describe 测试Join方法 测试流程
*           声明两个Sleeper线程任务,这两个线程将会休眠一段时间,在这段时间里,Joiner线程内部会调用sleeper
*           .join()方法加入进线程中,并打断Sleeper任务。 当两个Joiner线程结束之后
*           会将控制权返回给Sleeper,此时他们将接着执行睡眠操作,直到时间到达醒来。
* @author Hope6537
* @signdate 2014年7月26日下午1:26:20
*/
public class Joining {
public static void main(String[] args) {
Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper(
"Grumpy", 1500);
Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc",
grumpy);
// 他被打断 从而使之后的Joiner迅速完成操作
grumpy.interrupt();
// 而当Sleeper被中断或者自然结束之后,Joiner也会和Sleeper一同结束
}
}
join功能被新的类库构件CyclicBarrier所替代。后面将会说到。

11、创建有响应的(异步?)界面

下面两种方式,

1、关注于运算,不能获取到控制台输入 。

2、把运算放到任务里单独运行,此时就可以在运算的同时监听控制台输入。

实现例程

package org.hope6537.thinking_in_java.twenty_one;

import java.io.IOException;

/**
* @describe 有响应的用户界面
* @author Hope6537
* @signdate 2014年7月26日下午3:44:09
*/
public class ResponsiveUI extends Thread {
// 多线程环境下的共享机制变量
private volatile static double d = 1;

public ResponsiveUI() {
setDaemon(true);
start();
}

@Override
public void run() {
while (d > 0) {
d = d + (Math.PI + Math.E) / d;
}
}

public static void main(String[] args) throws IOException {
//敲下回车发现的确是在后台运行
new ResponsiveUI();
System.in.read();
System.out.println(d);
}

}

/**
* @describe 无响应的?F
* @author Hope6537
* @signdate 2014年7月26日下午3:45:27
*/
class UnresponsiveUI {
// 多线程环境下的共享机制变量
private volatile double d = 1;

public UnresponsiveUI() throws Exception {
while (d > 0) {
d = d + (Math.PI + Math.E) / d;
}
System.in.read();
}
}
12、线程组

Sun公司失败的尝试

13、捕获异常

由于线程的本质特性,使我们无法捕获线程中逃逸的异常

在main的主块放try-catch是没用的,为了解决这个问题,要修改执行器产生线程的方式,代码例程如下

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;

class ExceptionThread2 implements Runnable {
public void run() {
// 一个普通的会抛出异常的线程
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}

/**
* @describe 一个异常捕获器
* @author Hope6537
* @signdate 2014年7月26日下午4:24:49
*/
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}

/**
* @describe 产生线程的工厂
* @author Hope6537
* @signdate 2014年7月26日下午4:26:11
*/
class HandlerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
// 设置线程的异常捕获器
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}

public class CaptureUncaughtException {
public static void main(String[] args) {
// 在這裡我們就不需要try catch了
ExecutorService exec = Executors
.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
// 之前我们设置的都是针对具体情况逐个设计的捕获器
// 下面的是公用共有的异常捕获器
// 他仅仅只在目标线程内没有专有版本的捕获器的时候才针对目标工作
// Thread.setDefaultUncaughtExceptionHandler(new
// MyUncaughtExceptionHandler());
}
} /*
* Output: (90% match) HandlerThreadFactory@de6ced creating new Thread created
* Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 run() by
* Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 caught
* java.lang.RuntimeException
*/// :~
根据捕获器的设置,我们能够捕获异常或处理。

上面的示例可以让我们根据具体情况逐个设置处理器,如果要通用设定的话,最好是在Thread类中设定一个静态域,并将这个处理器设为默认的未捕获异常处理器。

import java.util.concurrent.*;

public class SettingDefaultHandler {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}


三、共享受限资源

1、不正确的访问资源

下面是一个整数生成器

abstract class IntGenerator {
// 使用布尔型来保证原子性
// 使用volatile关键字来保证可视性
private volatile boolean canceled = false;

public abstract int next();

public void cancel() {
this.canceled = true;
}

public boolean isCanceled() {
return canceled;
}
}
它有一个cancel方法,用该改变canceled布尔值变量,我们知道布尔值变量是具备原子性的。在赋值或者返回值这样的操作没有中断的可能。因此不会看到这个域在执行这些简单操作过程中的中间状态。为了保证可视性,他是volatile的(即不需要编译器进行优化)

之后我们定义了一个EvenChecker类用来测试

/**
* @describe 不正确的共享资源竞争
* @author Hope6537
* @signdate 2014年7月26日下午5:35:59
*/
public class EvenChecker implements Runnable {
private IntGenerator generator;
private final int id;

public EvenChecker(IntGenerator generator, int id) {
super();
this.generator = generator;
this.id = id;
}

@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(val + " not even!");
generator.cancel();
}
}
}

public static void test(IntGenerator gp, int count) {
// 在这里启用多个线程同时对这个生成器对象的Int值进行next操作
System.out.println("Press Control-C "
+ "to Exit");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
// 所以创建了很多個EvenCheck对象
exec.execute(new EvenChecker(gp, i));
}
exec.shutdown();
}

public static void test(IntGenerator gp) {
test(gp, 10);
}
}


/**
* @describe 具体实现生成器next方法的方法
* @author Hope6537
* @signdate 2014年7月26日下午5:31:31
*/
class EvenGenerator extends IntGenerator {

private int currentEvenValue = 0;

@Override
public int next() {
// 我们可以看到递增是根据两次自增长所确定的
// 但是一个线程很有可能在另一个任务执行第一个对值的底层操作之后,第二个操作之前,调用next方法
// 这将会使值处在一个不恰当的状态 在本例子中,即非偶数
++currentEvenValue;
++currentEvenValue;
// 递增也不是原子性的操作,如果我们没有对任务进行保护,那么单一的递增也不是安全的
return currentEvenValue;
}

public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
}


根据run方法体内部的代码,我们可以看到,当数值增长出现奇数的时候,程序就会停止。但是根据我们这些年顺序编程的经验看来,这是不可能的。

但是最后输出也是无情打脸

/*Press Control-C to Exit
7 not even!
9 not even!
5 not even!*/
让我们分析下原因:一个任务有可能在另一个任务执行了对value值的底层操作之后,在第二个递增操作之前,调用了next方法,这使这个值处于非法的状态。

有一点很重要,就是说程序可能会有多个步骤,在执行这些步骤的时候任务可能会被线程机制挂起,如果不保护任务,那么哪些操作可能会向不可预料的方向发展。

2、解决共享资源竞争

上面的示例说明了一个问题,那就是你永远也不知道一个线程何时在运行。对于并发操作,我们需要某种方式来防止两个任务访问相同的资源。也就是上锁

基本上所有的并发模式解决线程冲突问题的时候,都是采用序列化访问共享资源。就是说在某一个时刻,仅允许一个任务访问共享资源。这种机制称为互斥量

Java解决这种问题的方法1

public synchronized void f()
如果某个任务处在一个标记为synchronized的方法调用中。在这个线程从方法返回之前,其他试图访问该类中标有synchronized方法的线程都会被阻塞。

从而防止任务冲突。对于某个对象来说,其所有的synchronized方法共享同一个锁。(另外,将域设为private也很重要,否则synchronized也许无法保护变量)。

那么什么时候同步呢?《Java并发实战》的作者Brian Goetz解释道:

“如果你正在写一个变量,他可能接下来会被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量。那么必须使用同步,并且读/写线程都必须用相同的的锁同步”

每个访问临界共享资源的方法都必须被同步,否则他们就不会正确的工作。

现在让我们修复刚才的EvenGenerator

package org.hope6537.thinking_in_java.twenty_one;
/**
* @describe 使用synchronized来进行上锁操作
* @author Hope6537
* @signdate 2014年7月26日下午8:00:53
*/
public class SynchronizedEvenGenerator extends IntGenerator {
private int currentValue = 0;

@Override
public synchronized int next() {
// 上锁了之后就安全了
++currentValue;
// 对yield的调用提高Value在奇数时上下文切换的可能性 但是上锁了就不可能会有切换
Thread.yield();
++currentValue;
return currentValue;
}

public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
Java解决这种问题的方法2

使用显式的Lock对象,和synchronized不同的是,他必须显式的创建,锁定,释放。如下是例程

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @describe 使用显式的Lock锁
* @author Hope6537
* @signdate 2014年7月26日下午8:15:56
*/
public class MutexEvenGenerator extends IntGenerator {

private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();

@Override
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}

}
显式Lock在加锁和释放锁的方面,相比synchronized关键字,赋予了我们更灵活的控制力。

3、原子性和易变性

在Java并发编程中,一个不正确的知识就是原子操作不需要线程控制。实际上原子操作是不能被线程调度机制中断的工作。在水平不够的情况下,不应该依赖于原子性。

而我们之前介绍的volatile关键字确保了应用中的可视性。如果我们将一个域声明为volatile,那么只要对这个域进行了写操作,那么所有的读操作都会看到这个修改。

换句话说,它是直接写入到主存中去的。但是用它来做同步也有风险,如果一个域的值依赖于它之前的值时,那么volatile就无用了。

所以在一般情况下,我们优先使用synchronized。

4、原子类

AtomicInteger、AtomicLong、AtomicReference等…基本不用……

一个使用例程

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

/**
* @describe 使用原子整数类进行性能调优
* @author Hope6537
* @signdate 2014年8月7日下午4:13:03
*/
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);

public int getValue() {
return i.get();
}

private void evenIncrement() {
i.addAndGet(2);
}

public void run() {
while (true)
evenIncrement();
}

// 在这里赋值,更改,递增操作都是原子性的
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
public void run() {
System.err.println("Aborting");
System.exit(0);
}
}, 5000); // Terminate after 5 seconds
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while (true) {
int val = ait.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}


5、临界区

俗称synchronized代码块,格式如下

synchronized(syncObject){
//some actions
}
再进入该代码块之前,必须要获得syncObject对象的锁,如果其他线程获得了锁,那么阻塞,直到他们退出。

通过使用同步代码块,而不是对整个方法进行控制,可以使多个任务访问对象的时间性能获得显著提高。

设想下,如果一个方法带锁,那么它将会被锁在整个对象之外,而没有办法访问例如初始化、通过该对象的某些属性从而由分支选择要走的代码这样的判断区,类似于这样的不需要同步的地方,而是串行访问。

例如说游乐场,有人玩过山车,有人玩跳楼机,有人玩海盗船,而这些人都被堵在游乐场门口排队,只有前面的人玩过了,他才能去玩,即使他们玩的项目不一样。

而有了代码块,关于这些预处理的信息就可以并行进行。而是在每个游乐项目门口排队

具体的代码块使用例程如下

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.hope6537.thinking_in_java.twenty_one.Pair.PairValuesNotEqualException;

class Pair {
private int x, y;

public int getX() {
return x;
}

public int getY() {
return y;
}

public Pair(int x, int y) {
super();
this.x = x;
this.y = y;
}

public Pair() {
this(0, 0);
}

public void incrementX() {
x++;
}

public void incrementY() {
y++;
}

@Override
public String toString() {
return "Pair [x=" + x + ", y=" + y + "]";
}

public class PairValuesNotEqualException extends Exception {
public PairValuesNotEqualException() {
super("Pair values not equals " + Pair.this);
}
}

public void checkState() throws PairValuesNotEqualException {
if (x != y) {
throw new PairValuesNotEqualException();
}
}
}

/**
* @describe 模板方法设计模式
* @author Hope6537
* @signdate 2014年8月7日下午4:33:58
*/
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections
.synchronizedList(new ArrayList<Pair>());

public synchronized Pair getPair() {
return new Pair(p.getX(), p.getY());
}

protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
}

public abstract void increment();
}

class PairManager1 extends PairManager {
/*
* (non-Javadoc)
*
* @see org.hope6537.thinking_in_java.twenty_one.PairManager#increment()
*
* @author:Hope6537使用方法锁的形式
*/
@Override
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}

class PairManager2 extends PairManager {
/*
* (non-Javadoc)
*
* @see org.hope6537.thinking_in_java.twenty_one.PairManager#increment()
*
* @author:Hope6537 使用同步控制块
*/

@Override
public void increment() {
Pair temp;
synchronized (this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}

/**
* @describe 用来递增Pair
* @author Hope6537
* @signdate 2014年8月7日下午4:55:06
*/
class PairManipulator implements Runnable {
private PairManager pm;

public PairManipulator(PairManager pm) {
super();
this.pm = pm;
}

@Override
public void run() {
while (true) {
pm.increment();
}
}

@Override
public String toString() {
return "PairManipulator [" + pm.getPair() + " checkCount = "
+ pm.checkCounter.get() + "]";
}
}

/**
* @describe 用来同步检查是否合法Pair
* @author Hope6537
* @signdate 2014年8月7日下午4:55:25
*/
class PairChecker implements Runnable {
/**
* @describe
*/
private PairManager pm;

public PairChecker(PairManager pm) {
super();
this.pm = pm;
}

@Override
public void run() {
while (true) {
// 在每次检查成功 的时候都会递增计数器
pm.checkCounter.incrementAndGet();
try {
pm.getPair().checkState();
} catch (PairValuesNotEqualException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

public class CriticalSection {

static void testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator pm1 = new PairManipulator(pman1);
PairManipulator pm2 = new PairManipulator(pman2);
PairChecker pcheck1 = new PairChecker(pman1);
PairChecker pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (Exception e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1 : " + pm1 + " \npm2 : " + pm2);
System.exit(0);
}

/*
* 通过这里我们可以看出来 在单位时间内 方法上锁和代码块上锁的线程访问量 在四核八线程的电脑上超级明显的差距
* pm1 : PairManipulator [Pair [x=105, y=105] checkCount = 3227]
* pm2 : PairManipulator [Pair [x=106, y=106] checkCount = 249052283]
* 这正是性能调优的重要性 在安全的情况下,使得线程能更多的访问
*/

public static void main(String[] args) {
testApproaches(new PairManager1(), new PairManager2());
}
}

我们还可以使用显式的Lock对象来创建临界区

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @describe 使用显式的锁来创建临界区
* @author Hope6537
* @signdate 2014年8月7日下午5:07:54
*/
public class ExplicitCriticalSection {
public static void main(String[] args) {
CriticalSection.testApproaches(new ExplicitPairManager1(),
new ExplicitPairManager2());
}
}

class ExplicitPairManager1 extends PairManager {
private Lock lock = new ReentrantLock();

public synchronized void increment() {
lock.lock();
try {
p.incrementX();
p.incrementY();
store(getPair());
} finally {
lock.unlock();
}
}

}

class ExplicitPairManager2 extends PairManager {
private Lock lock = new ReentrantLock();

public void increment() {
Pair temp;
lock.lock();
try {
p.incrementX();
p.incrementY();
temp = getPair();
} finally {
lock.unlock();
}
store(temp);
}
}

 

6、在其他对象上同步

synchronized块必须要给定一个再其上同步的对象,也就是说我们可以来同步别的对象,而非标准用法synchronized(this)。

下面就是例程

package org.hope6537.thinking_in_java.twenty_one;
public class SyncObject {

public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
// 新建线程调用f
public void run() {
ds.f();
}
}.start();
// 而g使用main的线程来调用
ds.g();
}
}

/*
* f()方法与this对象同步,而g()则有一个同步于syncObject的临界块,因此这两个同步是互相独立的。
*/

class DualSynch {
private Object syncObject = new Object();

public synchronized void f() {
for (int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}

public void g() {
synchronized (syncObject) {
for (int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}
}


DualSync.f()在this同步,而g()有一个在syncObject上同步的同步控制块。因此这两个同步是相互独立的

从输出中我们可以看到,这两个方式在同时运行,因此任何一个方法都没有因为对另一个方法的同步而被阻塞。

/*Output
* g() f() g() f() g() f() g() f() g() f()
*/


7、线程本地存储

有的时候为了防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储可以为使用相同变量的每个不同线程都创建不同的存储。

例程如下:

package org.hope6537.thinking_in_java.twenty_one;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* @describe 线程本地存储 意义即为在部不同的线程中,面对相同的变量时,生成不同的存储,他们可以使得你可以将状态和线程关联起来
* @author Hope6537
* @signdate 2014年8月7日下午5:54:54
*/
public class ThreadLocalVariableHolder {
// 通常用作静态域存储
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);

protected synchronized Integer initialValue() {
return rand.nextInt(1000);
}
};

// 在ThreadLocal中,只能使用set或者get来访问对象 他是线程安全的,不需要上锁,不会出现竞争条件
public static void increment() {
value.set(value.get() + 1);
}

public static Integer get() {
return value.get();
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(1);
exec.shutdownNow();
}
}

class Accessor implements Runnable {
private final int id;

public Accessor(int idn) {
id = idn;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}

@Override
public String toString() {
return "#" + id + " : " + ThreadLocalVariableHolder.get();
}
}

ThreadLocal对象通常当做静态域存储。我们只能通过get或set方式和数据进行交互。注意get和increment方法都不是同步的,因为ThreadLocal保证不会出现竞争。

四、终结任务

1、在睡眠中终结

Bruce给出的仿真程序中,可以计算出每个进入门的通过人次。同时更改公园的总人次。这里面包含了对sleep的调用方法。这是一种终结任务的方式。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @describe 仿真花园类
* @author Hope6537(赵鹏)
* @signdate 2014年8月7日下午7:13:06
*/
public class OrnamentalGraden {

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Entrance(i));
}
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
//如果在250毫秒之内线程没有全部完全终止,那么输出
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("有些任务没有被终止");
}
System.out.println("总数 :" + Entrance.getTotalCount());
System.out.println("Entrances的总数: " + Entrance.sumEntrances());
}

}

/**
* @describe 使用单个对象来跟踪花园参观者的主计数值
* @author Hope6537
* @signdate 2014年8月7日下午7:25:10
*/
class Count {
private int count = 0;
private Random rand = new Random(47);

/*
* increment和 value方法都是上锁的 用来控制对count域的访问
*/
public synchronized int increment() {
// 使用random对象的方式来进行数字推送
// 如果去掉锁,程序就会立即崩溃
int temp = count;
if (rand.nextBoolean()) {
Thread.yield();
}
return (count = ++temp);
}

public synchronized int value() {
return count;
}
}

/**
* @describe Count对象作为本类的一个静态域来存储
* @author Hope6537
* @signdate 2014年8月7日下午8:06:44
*/
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances = new ArrayList<Entrance>();
/**
* @describe 待维护的本地值
*/
// 每个对象都在维护这个本地值 包含通过某个特定入口进入参观者的数量 提供了对count的检查
private int number = 0;
private final int id;
// 它只会被读取和赋值 可以安全的操作
private static volatile boolean canceled = false;

public static void cancel() {
canceled = true;
}

public Entrance(int id) {
this.id = id;
entrances.add(this);
}
<span style="white-space:pre">	</span>//在run方法主体中提供对number数据的操作。递增对象的同时进行休眠。
@Override
public void run() {
while (!canceled) {//实际上这里可以换成!Thread.interrupted()
synchronized (this) {
++number;
}
System.out.println(this + " Total:" + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
System.out.println("Sleep interrupted");
}
}
System.out.println("Stopping" + this);
}

public synchronized int getValue() {
return number;
}

@Override
public String toString() {
return "Entrance [id=" + id + "] " + getValue();
}

public static int getTotalCount() {
return count.value();
}

public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances) {
sum += entrance.getValue();
}
return sum;
}
}
在这里count存储于一个Entrance的静态域中,用作计数。操作它的方法都是带锁的方法,保持其数值的正确性。

同时每个Entrance对象都保有一个number域,用来校验count值。

在Entrance中,canceled是一个volatile布尔标志,所以可以安全的操作它。

在3秒钟后,main函数会发送cancel的信息。然后调用shutDown方法,之后调用awaitTermination方法,他会等待在执行器里的每个任务结束,如果所有任务在超时时间之前全部结束,那么返回true,否之返回false。'

这个方法导致所有的任务退出其run方块。并因此而终止任务。但是本身的Entrance对象还是有效地。所有sumEntrances方法依旧可以返回正常的信息。

2、在阻塞时终结

上面的实例表示了在sleep状态下终结任务的方法,sleep将最终唤醒,将检查cancel标志,从而确定是否要退出线程。在继续说明之前,我们有必要掰扯下线程的生命周期。

1)新建——线程被创建时的状态

2)就绪——只要调度器将时间片分给该线程,他就可以工作,反之就不可以

3)阻塞——因为有某个条件的限制,他无法运行任务。此时调度器将会忽略该线程。直到它回到就绪状态

4)死亡——方式有两种,从run方法返回,或者是被中断

而我们要研究的就是中断的方法。但是在这之前要知道如何让一个线程进入阻塞状态。

1)通过调用sleep方法

2)使用wait方法使线程挂起

3)I/O阻塞,例如正在等待控制台输入。

4)对象锁不可用的状态。即某个线程正在用这个对象的锁,而此时这个线程将会被阻塞。直到对方放弃锁。

而在阻塞时终结的线程的方法唯有两种

1)使线程打破阻塞条件,从而继续运行完成任务,结束线程。

2)中断当前线程,结束任务。

附录:永远不要用stop来终止线程,这将不会释放锁。

3、中断

1)可能会抛出异常的中断

正如我们想象的,当我们打断一个正在运行的任务的时候,更准确的说,是在run方法中间打断的时候,我们可能需要清理一些资源,它更像是抛出的异常。而Java就是这样做的。它会抛出一组异常,我们必须要把它好好地接住(try-catch)。

2)一种额外的解决方法?

Thread类中包含interrupt方法,它可以终止被阻塞的任务。但是在执行器模式下。我们无法操作Thread对象,JavaSE5之后也不建议我们直接操作Thread对象

所以更好的方法是使用Executor上的shutDownNow的方法。它将会中断所持有的全部线程。

但是有的时候我们也许只是想结束一个单一的线程,Java也同样提供了解决方法:如果我们使用执行器来创建并执行任务,那么使用submit()方法而不是executor()方法来执行任务,这个时候我们会获取到(返回)一个Future<?>对象,在其上调用cancel方法可以中断单个线程,下面是示例

package org.hope6537.thinking_in_java.twenty_one;

import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @describe 睡眠阻塞
* @author Hope6537
* @signdate 2014年8月13日下午12:47:28
*/
class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (Exception e) {
System.out.println("Interrupting");
}
System.out.println("Exiting Sleep");
}

}

/**
* @describe IO阻塞 不可中断
* @author Hope6537
* @signdate 2014年8月13日下午12:47:36
*/
class IOBlocked implements Runnable {
private InputStream in;

public IOBlocked(InputStream in) {
this.in = in;

}

@Override
public void run() {
try {
System.out.println("Waiting for Read");
in.read();
} catch (Exception e) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupting from io");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IO");
}
}

/**
* @describe 死锁阻塞 不可中断
* @author Hope6537
* @signdate 2014年8月13日下午12:47:42
*/
class SynchronizedBlocked implements Runnable {
// 这个方法永远不会放掉锁
public synchronized void f() {
while (true)
Thread.yield();
}

public SynchronizedBlocked() {
new Thread() {
public void run() {
f();// 被这个线程死锁
}
}.start();
}

public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
}

public class Interrupting {

private static ExecutorService exec = Executors.newCachedThreadPool();

static void test(Runnable r) throws InterruptedException {
// 执行exec中的单个线程 同时获取到该线程的可操作的Future对象
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
// 通过这个持有对象进行单个线程的终止操作
f.cancel(true);
System.out.println("Interrupt sent to" + r.getClass().getName());
}

public static void main(String[] args) throws InterruptedException {
test(new SleepBlocked());
System.out.println("----------------");
test(new SynchronizedBlocked());
System.out.println("----------------");
test(new IOBlocked(System.in));
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0);
}

/* 根据输出我们可以看到 我们能够直接中断sleep的阻塞线程 但是没有办法中断I/O阻塞和死锁阻塞?那可咋办呢?
* 对于这类问题 可以进行关闭任务再其上发生阻塞的底层资源
* Interrupting org.hope6537.thinking_in_java.twenty_one.SleepBlocked
* Interrupt sent toorg.hope6537.thinking_in_java.twenty_one.SleepBlocked
* ---------------- Interrupting Exiting Sleep Trying to call f()
* Interrupting org.hope6537.thinking_in_java.twenty_one.SynchronizedBlocked
* Interrupt sent
* toorg.hope6537.thinking_in_java.twenty_one.SynchronizedBlocked
* ---------------- Waiting for Read Interrupting
* org.hope6537.thinking_in_java.twenty_one.IOBlocked Interrupt sent
* toorg.hope6537.thinking_in_java.twenty_one.IOBlocked Aborting with
* System.exit(0)
*/
}
如同注释上说的,我们只能用这种方法中断sleep阻塞,却没有方法中断IO阻塞和死锁阻塞。那么有什么解决方法呢?

4、关闭底层资源以消除阻塞

如题所示,来个釜底抽薪,直接取消条件,让任务进入就绪状态,再中断试试?

package org.hope6537.thinking_in_java.twenty_one;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @describe 采用关闭底层资源的方法来终止线程
* @author Hope6537
* @signdate 2014年8月8日下午4:48:09
*/
public class CloseResource {

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput = new Socket("localhost", 8080)
.getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(300);
System.out.println("关闭所有的线程");
exec.shutdownNow();
//一旦底层资源被关闭,那么线程将会终止阻塞 有个可以感受到的延时差
TimeUnit.SECONDS.sleep(1);
System.out.println("关闭" + socketInput.getClass().getName());
socketInput.close();
TimeUnit.SECONDS.sleep(1);
//但是实际上貌似没关上……额……
System.out.println("关闭" + System.in.getClass().getName());
System.in.close();
}
}
/*
<span style="white-space:pre">	</span> * Waiting for Read Waiting for Read 关闭所有的线程 关闭java.net.SocketInputStream
<span style="white-space:pre">	</span> * Interrupting from io Exiting IO 关闭java.io.BufferedInputStream
<span style="white-space:pre">	</span> * Interrupting from io Exiting IO
<span style="white-space:pre">	</span> */
在shutdownNow被调用之后以及在两个输入流上调用close关闭流的方法的延迟所强调的就是一旦底层资源被关闭,那么任务将会解除阻塞。同时Java的nio通道给出了更多的I/O中断方法,下面是示例

package org.hope6537.thinking_in_java.twenty_one;

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.io.*;

/**
* @describe 使用NIO的自动响应中断机制
* @author Hope6537
* @signdate 2014年8月8日下午4:53:55
*/
class NIOBlocked implements Runnable {
// 网络流通信通道
private final SocketChannel sc;

public NIOBlocked(SocketChannel sc) {
this.sc = sc;
}

public void run() {
try {
System.out.println("Waiting for read() in " + this);
// 当读取时可能会产生异常?被中断的异常
sc.read(ByteBuffer.allocate(1));
} catch (ClosedByInterruptException e) {
System.out.println("ClosedByInterruptException");
} catch (AsynchronousCloseException e) {
System.out.println("AsynchronousCloseException");
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("Exiting NIOBlocked.run() " + this);
}
}

public class NIOInterruption {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
Future<?> f = exec.submit(new NIOBlocked(sc1));
exec.execute(new NIOBlocked(sc2));
exec.shutdown();
TimeUnit.SECONDS.sleep(1);
// 提供关闭线程的方法
f.cancel(true);
TimeUnit.SECONDS.sleep(1);
// 关闭通道从而解封那个阻塞
sc2.close();
}
}
Waiting for read() in org.hope6537.thinking_in_java.twenty_one.NIOBlocked@79ccc6c8
Waiting for read() in org.hope6537.thinking_in_java.twenty_one.NIOBlocked@730eb2f0
ClosedByInterruptException
Exiting NIOBlocked.run() org.hope6537.thinking_in_java.twenty_one.NIOBlocked@79ccc6c8
AsynchronousCloseException
Exiting NIOBlocked.run() org.hope6537.thinking_in_java.twenty_one.NIOBlocked@730eb2f0
Future中中断单个线程的方法也表演了一下~我们还可以关闭底层资源以释放锁,解除导致的互斥阻塞。嘛……不过这样一般不是必须的。

5、被互斥所阻塞

之间我们就看到了,如果尝试在一个对象上调用带锁方法,而这个对象的锁已经被其他的任务获得,那么本任务将挂起,直到锁可以被获得。而下面的这个实例表示了同一个互斥如何能被同一个任务多次获得

package org.hope6537.thinking_in_java.twenty_one;

/**
* @describe 被互斥所阻塞
* @author Hope6537
* @signdate 2014年8月9日上午9:44:10
*/
public class MultiLock {

public synchronized void f1(int count) {
if (count-- > 0) {
System.out.println("f1() calling f2() with count " + count);
f2(count);
}
}

public synchronized void f2(int count) {
if (count-- > 0) {
System.out.println("f2() calling f1() with count " + count);
f1(count);
}
}

public static void main(String[] args) {
final MultiLock lock = new MultiLock();
new Thread() {
// 关于这个锁的叙述 他在第一次調用的時候就獲取了multilock這個锁 因此用一个任务将会在f2()的调用中再次获取这个锁
// 因此一个任务应该能够调用同一个对象的其他synchronized方法
public void run() {
lock.f1(10);
};
}.start();
}
}
因此一个任务应该能够调用同一个对象的其他synchronized方法!就像在前面的IO阻塞所看到的那样,在任何时刻,如果任务以不可中断的方式所阻塞,那么都会有锁住程序的可能。比起在synchronized的方法所阻塞。ReentrantLock这个显式的锁具备可以被中断的能力。下面是实例。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Interrupting2 {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Blocked2());
thread.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt");
// 和I/O不同,interrupt是可以打断被互斥所阻塞的调用
thread.interrupt();
}
}

class BlockedMutex {
// 这种锁具备中断在其上被阻塞的任务的功能
private Lock lock = new ReentrantLock();

public BlockedMutex() {
// 他要获取所创建对象上自身的Lock 并且从不释放这个锁
lock.lock();
}

/**
* @descirbe 所以如果我们试图从第二个任务中调用f方法,不同于创建BlockedMutex这个任务,那麼就會因為對象不可獲得而阻塞
*/
public void f() {
try {
lock.lockInterruptibly();
System.out.println("lock acquired in f()");
} catch (Exception e) {
System.out.println("Interrupting from lock acquisition in f()");
}
}
}

class Blocked2 implements Runnable {
BlockedMutex blockedMutex = new BlockedMutex();

@Override
public void run() {
System.out.println("Waiting for f() in BlockedMutex");
// 方法会在这里停止 因为f不可被访问
blockedMutex.f();
System.out.println("Broken out of blocked call");
}

}
BlockedMutex本身具有一个构造器,他要获取所创建对象上自身的Lock,并用不释放,所以如果试图在第二个任务上调用f,那么将会被阻塞。

通过这个程序,我们可以看到,interrupt()可以打断被互斥所阻塞的调用。(尽管和尽量不要操作Thread原则相悖)

6、检查中断。

传统情况我们可以根据调用interrupted()来检查中断状态。

五、线程之间的协作

正如我们之前所学习的,当我们发现公共资源因为任务竞争产生冲突而导致出现了不正确信息的时候。我们可以用锁来干涉,使任何时刻只有一个任务访问该资源。

当我们解决了这个问题之后,下面要想的就是如何让任务之间彼此协作。不是干涉,而是协调。因为在这类问题中,例如说建筑问题,有人打地基,有人建结构,有人活水泥,等等,我们第一步需要先建房子的地基,然后可以并行的进入到活水泥和建立钢结构的步骤之中。而这两个步骤必须要在混凝土浇筑之前完成。管道必须要在浇筑之前到位。等等等。在这些任务中,有的可以并行执行,而有些必须按照步骤完成。

当任务协作时,关键问题是这些任务的握手,为了实现,我们选择了互斥。

在互斥的情况下。我们能确保只有一个任务可以响应某个信号,这样可以根除其他的竞争条件。还可以将任务挂起,知道外部的条件符合标准,再度执行。

所以如何实现握手问题,便是协作之本。

1、wait和notifyAll

wait可以使线程挂起,等待条件的改变从而执行动作。

而notify和notifyAll可以使被挂起的线程重新回到工作状态。

与sleep不同的是,对于wait而言

1)wait期间的对象锁是释放的

2)可以通过notify等方法,也可以通过延时等方法恢复执行

但是它也有些限制,只能在同步方法或同步代码块中合法调用这个wait或notifyAll方法。

下面是个抛光和打蜡的例程。车需要先打蜡然后再抛光,在涂另一层蜡之前,必须先等待抛光动作完成。同时,抛光任务在打蜡任务完成之前是不能工作的。

下面是例程

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @describe 驱动程序
*/
public class WaxOMatic {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}

/**
* @describe 车的打蜡,抛光,等待打蜡,等待抛光任务,分别具有挂起和唤醒设定
*/
class Car {

private boolean waxOn = false;

public synchronized void waxed() {
waxOn = true;
notify();
}

public synchronized void buffed() {
waxOn = false;
notify();
}

public synchronized void waitForWaxing() throws InterruptedException {
while (waxOn == false) {
// 将会被挂起该任务 而锁被释放,从而使其他的任务也被操作,安全的改变对象的状态
wait();
}
}

public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn == true) {
wait();
}
}
}

class WaxOn implements Runnable {
private Car car;

public WaxOn(Car car) {
super();
this.car = car;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("正在打蜡 Wax On!");
TimeUnit.MILLISECONDS.sleep(300);
car.waxed();
car.waitForBuffing();
}
} catch (Exception e) {
System.out.println("Exit by interrupted");
}
System.out.println("结束打蜡任务 Ending Wax on Task");
}
}

class WaxOff implements Runnable {
private Car car;

public WaxOff(Car car) {
super();
this.car = car;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
System.out.println("正在抛光 Wax Off");
TimeUnit.MILLISECONDS.sleep(300);
car.buffed();
}
} catch (Exception e) {
System.out.println("Exit by interrupted");
}
System.out.println("结束抛光任务 Ending Wax Off task");
}
}
在这里,处理握手条件的域就是一个单一的布尔属性waxOn 他表示状态。根据它的状态进行任务的分配和处理。所以我们需要用一个检查条件的while循环来包围wait

其本质就是要检查感兴趣的特定条件。并在条件不满足的情况下返回到wait中去。

2、错失的信号

当我们使用挂起-唤醒系统时候。可能会错过某些信号。下面是Bruce给出的一个例子

T1
synchronized(sharedMonitor){
//为T2设置条件信号
sharedMonitor.notify();
}
T2
while(条件信号){
//变更点
synchronized(sharedMonitor){
sharedMonitor.wait()
}
}


当T1发出信号的时候,T2获取到信号同时设置为true进入循环,在执行到变更点的时候,在线程调度器里很有可能切换到了T1,而此时T1将会对信号进行设置。然后调用唤醒。

当T2得以继续执行的时候,它依旧会向下走,并将线程挂起。而此时信号已经发生变化,唤醒信号被错过,而它将会被永远挂起。产生死锁

该问题的解决方案就是防止信号上产生竞争条件。如下

synchronized(sharedMonitor){
while(条件信号){
sharedMonitor.wait()
}
}
如果T1首先执行,当控制给T2的时候,它会发现条件变化,从而不会进入wait

如果T2首先执行,挂起之后返回给T1,这时T1会唤醒T2,没有错过信号。

3、notify和notifyAll

对于这两个方法来说,前者使用的条件非常苛刻。归纳如下

1)对于所有任务来说必须等待相同的条件,当条件发生变化时,只有一个任务从中受益

2)这些限制对所有可能存在的子类总是要起作用的

所以更多的情况下使用notifyAll更合适。

而对于notifyAll来说,当它被特定的锁调用的时候,他仅仅唤醒等待这个锁的任务。例程如下

package org.hope6537.thinking_in_java.twenty_one;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println(Thread.currentThread() + "");
}
} catch (Exception e) {
// Exit this way
}
}

synchronized void prod() {
notify();
}

synchronized void prodAll() {
notifyAll();
}
}

// 每个Task对象都会在blocker中阻塞

class Task implements Runnable {
static Blocker blocker = new Blocker();

@Override
public void run() {
blocker.waitingCall();
}
}

class Task2 implements Runnable {
static Blocker blocker = new Blocker();

@Override
public void run() {
blocker.waitingCall();
}
}

public class NotifyVsNotifyAll {

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Task());
}
exec.execute(new Task2());
Timer timer = new Timer();
// 所以每個run方法都經由激勵方法交替的調用喚醒和全部喚醒功能
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;

@Override
public void run() {
if (prod) {
System.out.println("notify()");
Task.blocker.prod();
prod = false;
} else {
System.out.println("notifyAll()");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(5);
timer.cancel();
System.out.println("Timer Canceled");
System.out.println("Task2.blokcer.prodAll()");
Task2.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("Shutting Down");
exec.shutdownNow();
}
// 在输出中我们看到就算是Task2被阻塞,也没有因为Task的唤醒或唤醒全部方法而将其唤醒
// 因为唤醒方法仅仅会在当前锁对象的方式中作用

}
4、生产者-消费者系统

Bruce:考虑这样的一个仿真模型,一个厨师和一个服务员。一个服务员给厨师提供信号,同时在厨师做好菜之前他都会被挂起。而厨师做好菜之后会通知服务员,服务员上菜,然后接着等待。厨师代表生产者,服务员代表消费者。如何实现协作呢?

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;

class Meal {
private final int orderNum;

public Meal(int orderNum) {
this.orderNum = orderNum;
}

public String toString() {
return "餐点序号 " + orderNum;
}
}

class WaitPerson implements Runnable {
private Restaurant restaurant;

public WaitPerson(Restaurant r) {
restaurant = r;
}

public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
// 外部是锁,内部是循环 这样可以保证退出等待循环之前。 条件将得到满足
while (restaurant.meal == null) {
System.out.println("侍者:当前没有食物 等待大厨开工");
wait(); // ... for the chef to produce a meal
}
}
System.out.println("侍者:拿到了 " + restaurant.meal);
// 调用解锁前必须声明同步
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch (InterruptedException e) {
System.out.println("侍者:被打断");
}
}
}

class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;

public Chef(Restaurant r) {
restaurant = r;
}

public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait(); // ... for the meal to be taken
}
if (++count == 10) {
System.out.println("大厨:没有食物了,关门");
// 向exec中的所有项发出中断
restaurant.exec.shutdownNow();
}
System.out.println("大厨:点餐完毕!准备制作! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("大厨:被打断");
}
}
}

/**
* @describe 生产者-消费者实现,使用单一的地点来存放对象
*/
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);

public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}

public static void main(String[] args) {
new Restaurant();
}
}
对于这个实例而言,我们使用了一个单一的信号地点来存放对象。在实际的模型中,我们都会使用队列来进行信号的传递,而使用while外嵌方法将能够使退出循环方法之前,条件能得到满足。
5、使用显式Lock和Condition对象来处理挂起-唤醒业务。
除了使用Wait-NotifyAll之外,还有一种方法能够实现功能。基本类是Condition,使用await挂起,signal和signalAll来唤醒,下面是实例

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @describe 使用显式Lock和Condition对象来处理挂起-唤醒业务。
*/
public class WaxOMatic2 {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff1(car));
exec.execute(new WaxOn1(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}

class Car1 {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// false为未打蜡 刚抛光完,true为未抛光 刚打蜡完
boolean waxOn = false;

// 打蜡
public void waxed() {
lock.lock();
try {
waxOn = true;
condition.signalAll();
} finally {
lock.unlock();
}
}

// 拋光
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}

// 抛光完毕等待打蜡
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while (waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}

// 打蠟完畢等待拋光
public void waitForBuffing() throws InterruptedException {
lock.lock();
try {
while (waxOn == true)
// 在获取锁的时候才能进行唤醒,等待操作
condition.await();
} finally {
lock.unlock();
}
}
}

class WaxOn1 implements Runnable {
private Car car;

public WaxOn1(Car c) {
car = c;
}

public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("打蜡:正在打蜡!");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("打蜡被打断");
}
System.out.println("打蜡:结束打蜡任务");
System.out.println("-----------------");
}
}

class WaxOff1 implements Runnable {
private Car car;

public WaxOff1(Car c) {
car = c;
}

public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
System.out.println("抛光:正在抛光!");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("抛光被打断");
}
System.out.println("抛光:结束抛光任务");
System.out.println("------------------");
}
}
显式的锁重要特征之try-finally系统。在构造器中,单一的Lock会产生一个Condition对象,这个对象用于管理任务之间的通信,但是它不包含状态信息,所以我们还是需要做一个标记信号。在使用的时候,要确保获取到这个锁才能调用挂起和唤醒操作。

6、生产者消费者队列

在进行协同操作的时候,我们可以使用同步队列来解决任务协作的问题。如同标准的队列一样,它是按照先进先出的顺序来的,单位时间内只能取出一个项。

如果消费者试图从任务中获取对象,而此时队列无可用项。那么队列可以挂起消费者任务。比起单纯的挂起-唤醒操作要安全的多。

下面一个是LiffOff对象的队列实现,消费者是LiffOutRunner,由阻塞队列推出并运行。

package org.hope6537.thinking_in_java.twenty_one;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;

class LiftOffRunner implements Runnable {

private BlockingQueue<LiftOff> rockets;

public LiftOffRunner(BlockingQueue<LiftOff> rockets) {
super();
this.rockets = rockets;
}

public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
System.out.println("在存入队列的时候被打断");
}
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run();
}
} catch (InterruptedException e) {
System.out.println("在取出数据时被打断");
}
System.out.println("从起飞架退出");
System.out.println("======================");
}
}

public class TestBlockingQueues {
static void getKey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (Exception e) {
e.printStackTrace();
}
}

static void getKey(String message) {
System.out.println(message);
getKey();
}

static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for (int i = 0; i < 5; i++) {
runner.add(new LiftOff(5));
}
getKey(" 按下回车键 ( " + msg + " )");
t.interrupt();
System.out.println("完成" + msg + "任务");
}

public static void main(String[] args) {
// 在使用阻塞队列的情况下可以不使用锁
test("LinkedBlockingQueue", // 无限空间
new LinkedBlockingQueue<LiftOff>());
test("ArrayBlockingQueue", // 修正空间
new ArrayBlockingQueue<LiftOff>(3));
test("SynchronousQueue", // 只有一个
new SynchronousQueue<LiftOff>());
}
}
在使用阻塞队列的情况下可以不使用锁

下面是一个土司的例程,制作吐司,抹黄油,沾果酱,按照顺序来进行的流水线工作。按照任务协同的标准,来对这个仿真程序进行实现。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @describe 使用阻塞队列来实现 做土司-抹黄油-沾果酱-吃掉流程 不使用synchronized和显式Lock
*/
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}

class Toast {
public enum Status {
DRY, BUTTERED, JAMMED
};

private Status status = Status.DRY;

private final int id;

public Toast(int id) {
super();
this.id = id;
}

public void butter() {
status = Status.BUTTERED;
}

public void jam() {
status = Status.JAMMED;
}

public Status getStatus() {
return status;
}

public int getId() {
return id;
}

@Override
public String toString() {
return "土司 [状态=" + status + ", 编号=" + id + "]";
}

}

class ToastQueue extends LinkedBlockingQueue<Toast> {
/**
* @describe
*/
private static final long serialVersionUID = 7496613851118253846L;
}

class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);

public Toaster(ToastQueue toastQueue) {
super();
this.toastQueue = toastQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
// 做土司 向队列中添加基础对象
Toast t = new Toast(count++);
System.out.println(t);
toastQueue.put(t);
}
} catch (Exception e) {
System.out.println("做土司被中断");
}
System.out.println("做完土司");
}
}

class Butterer implements Runnable {
//	从队列中获取基础对象
private ToastQueue dryQueue, butteredQueue;

public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
super();
this.dryQueue = dryQueue;
this.butteredQueue = butteredQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch (Exception e) {
System.out.println("抹黄油被中断");
}
System.out.println("抹完黄油");
}
}

class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;

public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
super();
this.butteredQueue = butteredQueue;
this.finishedQueue = finishedQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch (Exception e) {
System.out.println("沾果酱被中断");
}
System.out.println("沾完果酱");
}
}

class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;

public Eater(ToastQueue finishedQueue) {
super();
this.finishedQueue = finishedQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = finishedQueue.take();
if (t.getId() != counter++
|| t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error : " + t);
System.exit(0);
} else {
System.out.println("吃掉! " + t);
}
}
} catch (Exception e) {
System.out.println("吃的时候被中断");
}
System.out.println("吃完了~");
}
}
在每个类只和BlockingQueue通信的情况下,类与类之间的耦合大大消除了。仅仅和任务队列有通信。

7、在任务间使用管道进行输入和输出(I/O)

PipedWriter和PipedReader这个东西提供了线程功能的类库以对输入输出功能提供支持。管道基本上是一个阻塞队列的变体。通过输入输出在线程之间提供通信,这基本上是游戏的常用做法,下面是个实验的实例

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;
import java.io.*;
import java.util.*;

/**
* @describe 管道发送信息以实现通信
* @author Hope6537(赵鹏)
* @signdate 2014年8月10日下午1:10:33
* @version 0.9
* @company Changchun University&SHXT
*/
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();

/**
* @descirbe 返回输出管道
*/
public PipedWriter getPipedWriter() {
return out;
}

public void run() {
try {
while (true)
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch (IOException e) {
System.out.println(e + " Sender write exception");
} catch (InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}

/**
* @describe 管道获取
*/
class Receiver implements Runnable {
// 注意获取类
private PipedReader in;

/**
* @describe 獲取輸出管道,至關重要
*/
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}

public void run() {
try {
while (true) {
// Blocks until characters are there:
System.out.println("Read: " + (char) in.read() + ", ");
}
} catch (IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}

public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();//代表了互相通信的任务,它创造了一个Writer,它将数据放在Writer上,
Receiver receiver = new Receiver(sender);//然后receiver在调用read的时候将数据获取到,如果没有数据将会阻塞并挂起
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();//Reader是可以被中断的,不同于标准I/O
}
} /*
* Output: (65% match) Read: A, Read: B, Read: C, Read: D, Read: E, Read: F,
* Read: G, Read: H, Read: I, Read: J, Read: K, Read: L, Read: M,
* java.lang.InterruptedException: sleep interrupted Sender sleep interrupted
* java.io.InterruptedIOException Receiver read exception
*/// :~


六、死锁

1、哲学家就餐问题

我们都知道哲学家就餐问题,他们面前每人是一根筷子,如果就餐的话就必须要两根筷子,就是说获取左边的或者是右边的加上自己的才可以就餐,换句话说,他们无法同时就餐超过2个人。所以没吃到的就必须等待吃到的(任务)结束或者挂起,才能获取到筷子。

下面是实现例程,筷子类的定义

package org.hope6537.thinking_in_java.twenty_one;

public class Chopstick {
// 表示是被被占用
private boolean taken = false;

public synchronized void take() throws InterruptedException {
// 如果被占用,则挂起当前线程。
while (taken)
wait();
// 和上面的循环无关,不被占用就被当前线程占用
taken = true;
}

public synchronized void drop() {
// 取消当前占用,同时激活当前锁(想要这根筷子的)所阻塞的所有线程,表示可以拿这个筷子了
taken = false;
notifyAll();
}
}
当一个哲学家对象调用这个take的时候,那么就必须等待标志为false的时候才能获取到。当它用完之后,会调用drop来让其他等待这根筷子的哲学家激活,从而让他们获取。

下面是哲学家对象的例程

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;
import java.util.*;

public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
/**
* @describe 思考的时间长度
*/
private final int ponderFactor;
private Random rand = new Random(47);

private void pause() throws InterruptedException {
if (ponderFactor == 0)
// 如果不为零,就思考一段随机事件,否之,立刻返回。
return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}

public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
this.left = left;
this.right = right;
id = ident;
ponderFactor = ponder;
}

public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(this + " " + "思考");
pause();//首先试图思考
// Philosopher becomes hungry
System.out.println(this + " " + "拿起了右边的筷子");
right.take();//先拿右面的
System.out.println(this + " " + "拿起了左边的筷子");
left.take();//然后再拿左面的,如果右面的没拿到,就会一直处于阻塞状态,左边就算能到也不拿,死心眼啊
System.out.println(this + " " + "在吃饭");
pause();//吃了会
//System.out.println(this + " " + "放下了筷子");
right.drop();//依次放下筷子
left.drop();
}
} catch (InterruptedException e) {
System.out.println(this + " " + "被中断");
}
}

public String toString() {
return "哲学家 " + id + "号 ";
}
}
我们知道哲学家是一群死心眼,非得先拿右边的再拿左边的。假设A拿到了右面的筷子(相对于B左面的筷子——他们是圆桌聚餐),当B拿到右面之后再拿左面,发现自己的左面的筷子已经被A所拿走,A左面的被E,E被D,D被C,C被B……结果大家都拿着根单根筷子等着旁面的人放下(释放锁),一群哲学家就这样饿死了,死了,了

我们看看这群倒霉蛋的实际状况吧

import java.util.concurrent.*;

public class DiningPhilosophers {

public static void deadLockingRun(String[] args) throws Exception {

int ponder = 5;
if (args.length > 0)
ponder = Integer.parseInt(args[0]);
int size = 5;
if (args.length > 1)
size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++)
sticks[i] = new Chopstick();
for (int i = 0; i < size; i++)
// 得到他手边的左手的和右手的筷子的引用
// 他们都试图先获得右边的筷子,然后再获取左边的筷子
exec.execute(new Philosopher(sticks[i], sticks[(i + 1) % size], i,
ponder));
if (args.length == 3 && args[2].equals("timeout"))
TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();

}

public static void main(String[] args) throws Exception {
// 0思考时间
deadLockingRun(new String[] { "0", "5", "timeout" });
}
}
/*OutPut(省略)
哲学家 3号  拿起了右边的筷子
哲学家 2号  思考
哲学家 0号  拿起了右边的筷子
哲学家 1号  思考
哲学家 4号  拿起了右边的筷子
哲学家 1号  拿起了右边的筷子
哲学家 0号  拿起了左边的筷子
哲学家 2号  拿起了右边的筷子
哲学家 3号  拿起了左边的筷子
哲学家 2号  拿起了左边的筷子
哲学家 1号  拿起了左边的筷子
哲学家 4号  拿起了左边的筷子
哲学家 0号  被中断
哲学家 1号  被中断
哲学家 4号  被中断
哲学家 3号  被中断
哲学家 2号  被中断
*/
每次结果都不一样,但是这群倒霉蛋一个也没吃上饭。那我们如何消除死锁呢?

首先我们必须知道死锁的发生条件

1)互斥条件——任务中使用的资源中至少有一个是不能共享的

2)至少有一个任务它必须持有一个资源,且正在等待获取一个当前被别的任务持有的资源。

3)资源不能被任务抢占——资源必须等待被任务正常释放

4)必须有循环等待——轮回等待,就像是A等E,E被D,D被C,C被B,B等A一样。

要发生死锁的话,四个条件必须同时发生,所以我们只需要破坏就好了!最容易的就是破坏第四个条件。我们可以试着让某些哲学家不那么死心眼,可以试着先左后右

这样就可以防止循环等待的发生

public static void fixedLockingRun(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0)
ponder = Integer.parseInt(args[0]);
int size = 5;
if (args.length > 1)
size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++)
sticks[i] = new Chopstick();
for (int i = 0; i < size; i++) {
// 通过取消循环等待来解除死锁
if (i < (size - 1))
exec.execute(new Philosopher(sticks[i], sticks[i + 1], i,
ponder));
else
// 改变取筷子方式 不那么死心眼 可以看到顺序倒过来了
exec.execute(new Philosopher(sticks[0], sticks[i], i, ponder));
}
if (args.length == 3 && args[2].equals("timeout"))
TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}

死锁更多上是逻辑上的问题,也就是你得干想才能解决,没啥捷径……真是令人沮丧。

七、新类库的构件

1、CountDownLatch

Bruce的解释是:它被用来同步一个或多个任务,强制等待其他人执行的一组操作完成。

类似于老大做完了之后,裁判告诉他必须等他的队员做完,才能到下一关一样。

任何的在这个对象调用wait方法都将会阻塞。知道他的计数值到达0,其他任务在结束工作的时候,将会减少这个计数值,(目标是0)。

这个对象只能被触发一次,也就是单步并行。

它的典型用法是将一个任务分解成n个互相独立的可解决任务,当每个任务完成时,都会调用这个锁存器上面调用CountDown方法等待问题被解决的任务(已经完事的)在这个锁存器上调用await,将自己拦住,直到计数结束(然后可以进行下一步)

package org.hope6537.thinking_in_java.twenty_one;

//: concurrency/CountDownLatchDemo.java
import java.util.concurrent.*;
import java.util.*;

//Performs some portion of a task:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;

TaskPortion(CountDownLatch latch) {
this.latch = latch;
}

public void run() {
try {
// 工作的同时计数
doWork();
//完成任务了!开始减少计数。
latch.countDown();
} catch (InterruptedException ex) {
// Acceptable way to exit
}
}

public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
System.out.println(this + "completed");
}

public String toString() {
return String.format("%1$-3d ", id);
}
}

// Waits on the CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;

WaitingTask(CountDownLatch latch) {
this.latch = latch;
}

public void run() {
try {
//表示已经完事了,在等大家呢!
latch.await();
// 操作被激活之后才输出
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException ex) {
System.out.println(this + " interrupted");
}
}

public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}

/**
* @describe 用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作的完成
*/
public class CountDownLatchDemo {
static final int SIZE = 100;

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for (int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
System.out.println("Launched all tasks");
exec.shutdown(); // Quit when all tasks complete
}
}


2、CyclicBarrier

它和上面的很相似,不同的是他可以多次进行计数不局限于单步。它使得所有的任务在它创建的栅栏处列队,然后一致的向前进,再到下一个栅栏出列队

很像游戏里的CheckPoint技术!下面是一个Bruce给出的神奇的赛马程序

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;

public Horse(CyclicBarrier b) {
barrier = b;
}

public synchronized int getStrides() {
return strides;
}

@Override
public void run() {
try {
// 这一次的动作相当于这一秒他跑了多远
while (!Thread.interrupted()) {
synchronized (this) {
// 给出一个0 1 2的随机数 然后加到当前里程数里
strides += rand.nextInt(3);
}
// 在其他的完成之前等待 注意是直接调用对象
barrier.await();
}
} catch (InterruptedException e) {

} catch (BrokenBarrierException e1) {
throw new RuntimeException(e1);
}

}

public Horse(int strides) {
super();
this.strides = strides;
}

@Override
public String toString() {
return "Horse [id=" + id + ", strides=" + strides + "]";
}

/**
* @descirbe 奔跑
*/
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("-");
}
s.append(id);
return s.toString();
}
}

/**
*
* @describe CyclicBarrier的Demo应用 赛马仿真程序
*/
public class HorseRace {

static final int FINISH_LINE = 75;

private List<Horse> horses = new ArrayList<Horse>();

private ExecutorService exec = Executors.newCachedThreadPool();
// 工作机制 创建一组任务,并行的执行工作,然后在进行下一个步骤之前等待
private CyclicBarrier barrier;

public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {

@Override
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("X");
}
System.out.println(s);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + " won!");
exec.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (Exception e) {
System.out.println("动作睡眠被打断");
}
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}

public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses, pause);
}
}
可以看到向CyclicBarrier提供了一个栅栏动作,它是一个Runnable,当计数值到达0之后自动向前执行。
运行看看吧~很有意思的。

3、DelayQueue

顾名思义,这是一个通过延时的时间进行动态处理的(优先队列)。放置实现了Delayed接口的对象。排在队头的是最紧急的任务(即延时最短)。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @describe 符合DelayQueue标准的对象需要实现两个接口
*/
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();

public DelayedTask(int delta) {
super();
this.delta = delta;
this.trigger = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(this.delta,
TimeUnit.MILLISECONDS);
sequence.add(this);<pre name="code" class="java"><span style="white-space:pre">	</span>//它使用TimeUnit的NanoSeconds单位进行计算,对于我们这样的毫秒众来说,就需要处理下了
}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {//没有compareTo如何进行比较呢~笑~DelayedTask that = (DelayedTask) o;if (trigger < that.trigger)
{return -1;}if (trigger > that.trigger) {return 1;}return 0;}@Overridepublic void run() {System.out.println(this + " ");}@Overridepublic String toString() {return String.format("[%1$-4d]", delta) + " Task " + id;}public String summary() {return "(" + id +
":" + delta + ")";} //这个嵌套的类提供了一种关闭所有事务的途径,当它被放置在队尾的时候。他就能发挥作用public static class EndSentinel extends DelayedTask {private ExecutorService exec;
public EndSentinel(int delta, ExecutorService exec) {super(delta);this.exec = exec;}@Overridepublic void run() {for (DelayedTask pt : sequence) {System.out.println(pt.summary() + " ");}System.out.println();System.out.println(this + " Calling ShutdownNow()");exec.shutdownNow();}}}/**
* @describe 使优先队列里面装载的Delayed实现类线程执行 */class DelayedTaskConsumer implements Runnable {private DelayQueue<DelayedTask> q;public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {super();this.q = q;}@Overridepublic void run() {try {while (!Thread.interrupted())
{q.take().run();}} catch (Exception e) {}System.out.println("Finished DelayedTaskConsumer");}}/** * @describe 延迟时间优先阻塞队列的应用 但是怎么使用的我还是有点蒙圈 */public class DelayQueueDemo {public static void main(String[] args) {Random rand = new Random(47);ExecutorService exec
= Executors.newCachedThreadPool();DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();for (int i = 0; i < 20; i++) {queue.put(new DelayedTask(rand.nextInt(5000)));}queue.add(new DelayedTask.EndSentinel(5000, exec));//可以看到我们最后把大杀器放进去了exec.execute(new
DelayedTaskConsumer(queue));}}

我们可以看到,任务创建的顺序和任务执行的顺序并没有影响。它是按照延迟时间来进行的。

4、ProrityBlockingQueue

阻塞的优先队列——看到它我只能这样形容,把它当做带锁的优先队列就可以了,它更多担当的是调度处理。我们可以给它配给一个优先级数字

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {

private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();

public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}

public int compareTo(PrioritizedTask arg) {//还是那句话,必须实现compareTo啊
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
}

public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}

public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}

public String summary() {
return "(" + id + ":" + priority + ")";
}
<span style="white-space:pre">	</span>//用于放在最后结束任务
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;

public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}

public void run() {
int count = 0;
for (PrioritizedTask pt : sequence) {
System.out.println(pt.summary());
if (++count % 5 == 0)
System.out.println();
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
//给出任务的提供队列,和TaskComsumer相互通过队列依次连接,用于创建任务
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;

public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}

public void run() {

for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}

try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}

for (int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));

queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {

}
System.out.println("Finished PrioritizedTaskProducer");
}
}

class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;

public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}

public void run() {
try {
while (!Thread.interrupted())
//取出并运行
q.take().run();
} catch (InterruptedException e) {

}
System.out.println("Finished PrioritizedTaskConsumer");
}
}

public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}


5、ScheduledExecutor

这东西有点类似于Windows的任务计划一样。我们可以通过它将Runnable对象设置为在将来的某个时刻执行。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @describe 预定时间,预订条件来执行任务
*/
public cl
12597
ass GreenhouseScheduler {
<span style="white-space:pre">	</span>//可视的变量域,防止冲突。
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";

public synchronized String getThermostat() {
return thermostat;
}

public void setThermostat(String thermostat) {
this.thermostat = thermostat;
}

// 时间表线程执行器
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
<span style="white-space:pre">	</span>//通过设定延时来讲Runnable对象执行
public void schedule(Runnable event, long delay) {
scheduler.schedule(event, delay, TimeUnit.MILLISECONDS);
}
<span style="white-space:pre">	</span>//重复执行的方法
public void repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period,
TimeUnit.MILLISECONDS);
}
<span style="white-space:pre">	</span>//下面的都是可执行的状态表示器,由Scheduled来进行执行。
class LightOn implements Runnable {
@Override
public void run() {
System.out.println("将灯打开");
light = true;
}
}

class LightOff implements Runnable {
public void run() {
System.out.println("关掉灯光");
light = false;
}
}

class WaterOn implements Runnable {
public void run() {
System.out.println("打开水流");
water = true;
}
}

class WaterOff implements Runnable {
public void run() {
System.out.println("关闭水流");
water = false;
}
}

class ThermostatNight implements Runnable {
public void run() {
System.out.println("天黑了");
setThermostat("Night");
}
}

class ThermostatDay implements Runnable {
public void run() {
System.out.println("天亮了");
setThermostat("Day");
}
}

class Bell implements Runnable {
public void run() {
System.out.println("Bing~!");
}
}

class Terminate implements Runnable {
@Override
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
new Thread() {
public void run() {
for (DataPoint d : data) {
System.out.println(d);
}
};
}.start();
}
}
<span style="white-space:pre">	</span>//它将持有并显示每个单个的数据段。并收集室内的数据
static class DataPoint {
final Calendar time;
final float/* double */temperature;
final float humidity;

public DataPoint(Calendar time, float temperature, float humidity) {
super();
this.time = time;
this.temperature = temperature;
this.humidity = humidity;
}

@Override
public String toString() {
return time.getTime()
+ String.format(" 溫度: %1$.1f 濕度: %2$.2f", temperature,
humidity);
}
}

private Calendar lastTime = Calendar.getInstance();
{
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}

private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections
.synchronizedList(new ArrayList<DataPoint>());//在这里我们用到了synchronizedList,这是一个带锁的List对象,为了防止任务之间的干涉。
<span style="white-space:pre">	</span>//而它则是被调度的任务,每次被运行的时候都能产生数据并将其放入到data这个表中。
class CollectData implements Runnable {
@Override
public void run() {
System.out.println("收集数据中");
synchronized (this) {
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
if (rand.nextInt(5) == 4) {
tempDirection = -tempDirection;
}
lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat());
if (rand.nextInt(5) == 4) {
humidityDirection = -humidityDirection;
}
lastHumidity = lastHumidity + humidityDirection
* rand.nextFloat();
data.add(new DataPoint((Calendar) lastTime.clone(), lastTemp,
lastHumidity));
}
}
}

public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}


6、Semaphore

它相当于将锁分配给n个人,担当者许可证的作用。我们可以想象下对象池的模型,里面装载着对象,用的拿,不用的放回去。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
* @describe 通过许可证方式建立的对象池
*/
public class Pool<T> {

private int size;
private List<T> items = new ArrayList<T>();
// 这个布尔数组将会追踪迁出的对象
private volatile boolean[] checkedOut;
// 这是啥? 许可证,能够允许n个任务同时访问这个资源
private Semaphore available;

public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
for (int i = 0; i < size; ++i) {
try {
items.add(classObject.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}

public T checkOut() throws InterruptedException {
available.acquire();
// 声明给出一个对象,同时返回值中的确迁出一个对象
// 如果当前对象池没有对象可用了,那么就会阻塞当前调用进程,知道有对象返回并可用
return getItem();
}

private synchronized T getItem() {
for (int i = 0; i < size; i++) {
if (!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}

private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if (index == -1) {
return false;
}
if (checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false;
}

public void checkIn(T x) {
if (releaseItem(x)) {
// 将作为参数的这个对象收回
available.release();
}
}
}
为了测试,提供一个Fat对象,它的构造相当费时。

package org.hope6537.thinking_in_java.twenty_one;

/**
* @describe 一个重对象
*/
public class Fat {

private volatile double d;
private static int counter = 0;
private final int id = counter++;

public Fat() {
for (int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double) i;
}
}

public void operation() {
System.out.println(this);
}

@Override
public String toString() {
return "Fat [id=" + id + "]";
}

}
我们在池中管理这些对象,以限制构造器产生的代价。我们可以写出这个测试类:获取Fat对象,一段时间之后再将其返回。

package org.hope6537.thinking_in_java.twenty_one;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {

final static int SIZE = 25;

public static void main(String[] args) throws InterruptedException {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < SIZE; i++) {
exec.execute(new CheckOutTask<Fat>(pool));
}
System.out.println("All CheckoutTasks Created");
// 一个装载fat对象的表
List<Fat> list = new ArrayList<Fat>();
for (int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.println(i + ": main() thread checked out");
// 打印自己并装载到表中
f.operation();
list.add(f);
}
// 又出现了,单个线程的引用
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// 在对象池没有对象的情况下,该签出将被阻塞
pool.checkOut();
} catch (Exception e) {
System.out.println("checkout() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
// 线程终止
blocked.cancel(true);
for (Fat f : list) {
pool.checkIn(f);
}
// 将会忽视第二次插入
for (Fat f : list) {
pool.checkIn(f);
}
exec.shutdown();
}

}
//他将会直接操作对象池
class CheckOutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;

public CheckOutTask(Pool<T> pool) {
super();
this.pool = pool;
}

@Override
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this + "checked in " + item);
pool.checkIn(item);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return "CheckOutTask [id=" + id + "]";
}
}


7、Exchanger

Bruce说道,它是两个任务之间交换对象的栅栏,当任务进入这些栅栏时,他们各自持有一个对象,当他们离开时。他们都拥有之前由对象持有的对象

我理解成为:复制!但是并不是这样……慢慢理解吧……

八、仿真

1、银行出纳员仿真

这是个典型的最优解DP问题,面对N个顾客,如何让队不排的特别长,同时还能最大化有效利用柜台,而且使用的柜台员还更少些。

ACM时代的话就特么开推dp方程了。但是现在并发仿真的降临保护了我们……

package org.hope6537.thinking_in_java.twenty_one;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @describe 银行出纳员仿真
*/
public class BankTellerSimulation {

static final int MAX_LINE_SIZE = 50;
static final int ADJUSTMENT_PERIOD = 1000;

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
CustomerLine customers = new CustomerLine(MAX_LINE_SIZE);
exec.execute(new CustomerGenerator(customers));
exec.execute(new TellerManaegr(exec, customers, ADJUSTMENT_PERIOD));
if (args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
} else {
System.out.println("按下回车键以退出");
System.in.read();
}
exec.shutdownNow();
}

}

/**
* @describe 该类为顾客实体类,简化版 仅仅包含一个final int域 他是一个只读对象
*           对于每个Teller任务在任何时刻都只从输入队列中移除一个Customer 并且在这个Customer对象工作直到完成
*           因此在任何时刻只有一个对象能访问他
*/
class Customer {
private final int serviceTime;

public Customer(int serviceTime) {
super();
this.serviceTime = serviceTime;
}

public int getServiceTime() {
return serviceTime;
}

@Override
public String toString() {
return "[" + serviceTime + "]";
}
}

/**
* @describe 表示顾客在等待被某个Teller服务时所排成的单一的行 这只是一个阻塞队列
*/
class CustomerLine extends ArrayBlockingQueue<Customer> {

/**
* @describe
*/
private static final long serialVersionUID = 3441359585380160156L;

public CustomerLine(int maxLineSize) {
super(maxLineSize);
}

@Override
public String toString() {
if (this.size() == 0) {
return "[空]";
}
StringBuilder result = new StringBuilder();
for (Customer customer : this) {
result.append(customer);
}
return result.toString();
}
}

/**
* @describe 它附着在CustomerLine之上,按照随机的时间向这个队列中添加Customer
*/
class CustomerGenerator implements Runnable {
private CustomerLine customers;
private static Random rand = new Random(47);

public CustomerGenerator(CustomerLine customers) {
super();
this.customers = customers;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(300);
customers.put(new Customer(rand.nextInt(1000)));

}
} catch (Exception e) {
System.out.println("顾客生产器被打断");
}
System.out.println("顾客生产器已停止");
}
}

/**
* @describe 出纳员实体类 Teller從CustomerLine中取出Customer
*           在任何時刻他只能處理一個Customer,並且跟蹤在這個特定班次中有他服務的Customer的數量
*           ,在沒有足夠多的顧客時,他會進入doSomeThingElse狀態,当出现很多顾客的时候,他又被重新补充回来
*           而优先队列的特性会保证工作量最少的出纳员会被推向前台
*/
class Teller implements Runnable, Comparable<Teller> {
private static int counter = 0;
private final int id = counter++;
// Customers served during this shift:
private int customersServed = 0;
private CustomerLine customers;
private boolean servingCustomerLine = true;

public Teller(CustomerLine cq) {
customers = cq;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Customer customer = customers.take();
TimeUnit.MILLISECONDS.sleep(customer.getServiceTime());
synchronized (this) {
customersServed++;
while (!servingCustomerLine) {
wait();
}
}
}
} catch (Exception e) {
System.out.println(this + " 被打断");
}
System.out.println(this + " 业务结束");
}

@Override
public synchronized int compareTo(Teller other) {
return customersServed < other.customersServed ? -1
: (customersServed == other.customersServed ? 0 : 1);
}

public synchronized void doSomethingElse() {
customersServed = 0;
servingCustomerLine = false;
}

public synchronized void serveCustomerLine() {
assert !servingCustomerLine : "already serving " + this;
servingCustomerLine = true;
notifyAll();
}

@Override
public String toString() {
return "Teller [id=" + id + "]";
}

public String shortString() {
return "T " + id;
}
}

/**
* @describe 出纳经理 控制系统,用来控制出纳员的数量
*/
class TellerManaegr implements Runnable {
private ExecutorService exec;
private CustomerLine customers;
private PriorityBlockingQueue<Teller> workingTellers = new PriorityBlockingQueue<Teller>();
private Queue<Teller> tellersDoingOtherThings = new LinkedList<Teller>();
private int adjustmentPeriod;
private static Random rand = new Random(47);

public TellerManaegr(ExecutorService exec, CustomerLine customers,
int adjustmentPeriod) {
super();
this.exec = exec;
this.customers = customers;
this.adjustmentPeriod = adjustmentPeriod;
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
}

/**
* @descirbe 闲置一个出纳员
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午12:39:39
* @version 0.9
*/
public void reassignOneTeller() {
Teller teller = workingTellers.poll();
teller.doSomethingElse();
tellersDoingOtherThings.offer(teller);
}

/**
* @descirbe 可以用一个稳定的方式增加或移除出纳员
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午1:48:02
* @version 0.9
*/
public void adjustTellerNumber() {
// 如果排队的人数过多,就加一个窗口
if (customers.size() / workingTellers.size() > 2) {
// 如果有闲置的出纳员
if (tellersDoingOtherThings.size() > 0) {
Teller teller = tellersDoingOtherThings.remove();
teller.serveCustomerLine();
workingTellers.add(teller);
return;
}
// 否则的话就再雇佣一个
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
return;
}
if (workingTellers.size() > 1
&& customers.size() / workingTellers.size() < 2) {
reassignOneTeller();
}
if (customers.size() == 0) {
// 如果没有顾客,那么就流一个就够了
while (workingTellers.size() > 1) {
reassignOneTeller();
}
}
}

public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
adjustTellerNumber();
System.out.print(customers + " {");
for (Teller teller : workingTellers) {
System.out.print(teller.shortString() + " ");
}
System.out.println("}");
}
} catch (Exception e) {
System.out.println(this + "被中断 ");
}
System.out.println(this + "已结束");
}

@Override
public String toString() {
return "TellerManager";
}
}
TellerManager是各种活动的中心,他跟踪所有的出纳员和等待服务的顾客。他试图算出给定的顾客数中,最优的出纳员数是多少。

2、分发工作,车辆组装

我们可以在这个例子中看到我们是如何将任务分阶段,流水化处理的,更重要的是通信队列的使用

package org.hope6537.thinking_in_java.twenty_one;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @describe 分发工作 建造车辆
*/
public class CarBuilder {
public static void main(String[] args) throws Exception {
// 声明流水线
CarQueue chassisQueue = new CarQueue(), finishingQueue = new CarQueue();
// 建立线程任务
ExecutorService exec = Executors.newCachedThreadPool();
// 建立对象池
RobotPool robotPool = new RobotPool();
// 启动并建立3种机器人,让其待命wait()
exec.execute(new EngineRobot(robotPool));
exec.execute(new DriveTrainRobot(robotPool));
exec.execute(new WheelRobot(robotPool));
// 启动生产流水线,等待装配工作
exec.execute(new Assembler(chassisQueue, finishingQueue, robotPool));
// 启动报告器,等待报告装好了的车辆
exec.execute(new Reporter(finishingQueue));
// 通过启动底盘装配器 开始整个流水线工作
exec.execute(new ChassisBuilder(chassisQueue));
TimeUnit.SECONDS.sleep(15);
exec.shutdownNow();
}
}

/**
* @describe 一个车的模型
*/
class ModelCar {

private final int id;
private boolean engine = false, driveTrain = false, wheels = false;

public ModelCar(int idn) {
id = idn;
}

// Empty Car object:
public ModelCar() {
id = -1;
}

/**
* @descirbe 使用锁方法,在某一时刻,只有一个线程能获取到这个对象
* @author Hope6537(赵鹏)
* @return
* @signDate 2014年8月11日下午2:20:18
* @version 0.9
*/
public synchronized int getId() {
return id;
}

/**
* @descirbe 添加引擎
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午2:22:45
* @version 0.9
*/
public synchronized void addEngine() {
engine = true;
}

/**
* @descirbe 安装车厢
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午2:22:50
* @version 0.9
*/
public synchronized void addDriveTrain() {
driveTrain = true;
}

/**
* @descirbe 安装轮子
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午2:22:55
* @version 0.9
*/
public synchronized void addWheels() {
wheels = true;
}

public synchronized String toString() {
return "车辆 " + id + " [" + " 引擎: " + engine + " 车厢: " + driveTrain
+ " 轮子: " + wheels + " ]";
}
}

/**
* @describe 一个车辆队列 用于装载被配置的车辆 由于是流水线作业 所以采用链式结构
*/
class CarQueue extends LinkedBlockingQueue<ModelCar> {

private static final long serialVersionUID = 2998611961570480381L;

}

/**
* @describe 一个底盘创建器
*/
class ChassisBuilder implements Runnable {
/**
* @describe 准备往流水线上放已经装好底盘的车辆 获取到流水线对象
*/
private CarQueue carQueue;
private int counter = 0;

public ChassisBuilder(CarQueue cq) {
carQueue = cq;
}

public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(500);
// 开始建造实体类 建造一个车辆模型,当然其他的属性还没有来得及配置
ModelCar c = new ModelCar(counter++);
System.out.println("底盘创建器: 刚刚新创建了" + c);
// 然后将其放入队列
carQueue.put(c);
}
} catch (InterruptedException e) {
System.out.println("底盘创建器: 被打斷");
}
System.out.println("底盘创建器: 任务结束");
}
}

/**
* @describe 装配工序
*/
class Assembler implements Runnable {
// 获取到两个生产线队列 ,一个是刚刚装好底盘的队列, 一个是已经完成的车辆队列 获取前者的对象做出处理然后放到后者里面去
private CarQueue chassisQueue, finishingQueue;
// 一个预设的实体车模型
private ModelCar car;
// 用于并行的执行动作,并在下一个步骤之前如果还没有全部完成的话保持挂起的触发事件
private CyclicBarrier barrier = new CyclicBarrier(4);
// 一个机器台
private RobotPool robotPool;

/**
* @describe 获取车辆队列和装配机器人
* @param cq
*            装配底盘队列
* @param fq
*            完成车辆队列
* @param rp
*            机器人装配工
*/
public Assembler(CarQueue cq, CarQueue fq, RobotPool rp) {
chassisQueue = cq;
finishingQueue = fq;
robotPool = rp;
}

public ModelCar car() {
return car;
}

/**
* @descirbe 获得并行计数器
* @return
* @signDate 2014年8月11日下午2:32:13
* @version 0.9
*/
public CyclicBarrier barrier() {
return barrier;
}

public void run() {
try {
while (!Thread.interrupted()) {
// 这个take操作将会阻塞 直到底盘队列里面出现了可用的车模型(即刚装底盘的车)
car = chassisQueue.take();
// 在机器人对象池里雇佣3种机器人
robotPool.hire(EngineRobot.class, this);
robotPool.hire(DriveTrainRobot.class, this);
robotPool.hire(WheelRobot.class, this);
barrier.await(); // 等待这三个同学完成工序,再执行下一步
// 然后将装配好了的车辆放入到完成队列中
finishingQueue.put(car);
}
} catch (InterruptedException e) {
System.out.println("装配工序:被打断");
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
System.out.println("装配工序:任务结束");
}
}

/**
* @describe 报告获取源 用于将所有的已完成车辆提取出来
*/
class Reporter implements Runnable {
private CarQueue carQueue;

/**
* @describe 获取到车辆队列
* @param cq
*/
public Reporter(CarQueue cq) {
carQueue = cq;
}

public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("车辆装配完成: " + carQueue.take());
}
} catch (InterruptedException e) {
System.out.println("装配完成提示:被打断");
}
System.out.println("装配完成提示:结束任务");
}
}

/**
* @describe 一个泛用的装配机器人抽象类
*/
abstract class Robot implements Runnable {

/**
* @describe 内置了一个机器人连接池
*/
private RobotPool pool;
private final String name;

public Robot(RobotPool p, String name) {
pool = p;
this.name = name;
}

/**
* @describe 所有实现了机器人的机器人派生类都可以获取到装配工序对象
*/
protected Assembler assembler;

/**
* @descirbe 给当前的机器人对象注册装配工序
* @param assembler
* @return
*/
public Robot assignAssembler(Assembler assembler) {
this.assembler = assembler;
return this;
}

/**
* @describe 是否正在工作
*/
private boolean engage = false;

/**
* @descirbe 开始工作 同时将engage设为true 并且启动当前工序
*/
public synchronized void engage() {
engage = true;
notifyAll();
}

// 这一部分是机器人的具体装配动作 也是最重要的地方
abstract protected void performService();

public void run() {
try {
// 在线程中挂起,直到被需要调用
powerDown();
while (!Thread.interrupted()) {
// 进行工作
performService();
assembler.barrier().await(); // 同步步骤,等待其他装配步骤完成
// 完成工作 接着挂起
powerDown();
}
} catch (InterruptedException e) {
System.out.println(this + " 被打断");
} catch (BrokenBarrierException e) {
// 这里会出现线程终止异常 不必理会
throw new RuntimeException(e);
}
System.out.println(this + " 任务结束");
}

/**
* @descirbe [关闭电源] 即为挂起当前的工序
*/
private synchronized void powerDown() throws InterruptedException {
// 是否工作 = false
engage = false;
// 和装配工序断线
assembler = null;
// 将其本身返回对象池
pool.release(this);
while (engage == false)
// 循环检查条件挂起 因为engage是无锁的
wait();
}

public String toString() {
return name;
}
}

class EngineRobot extends Robot {

public EngineRobot(RobotPool pool) {
super(pool, "[引擎装配机器人]");
}

protected void performService() {
System.out.println(this + " 正在安装引擎");
assembler.car().addEngine();
}
}

class DriveTrainRobot extends Robot {
public DriveTrainRobot(RobotPool pool) {
super(pool, "[车厢装配机器人]");
}

protected void performService() {
System.out.println(this + " 正在安装车厢");
assembler.car().addDriveTrain();
}
}

class WheelRobot extends Robot {
public WheelRobot(RobotPool pool) {
super(pool, "[轮子装配机器人]");
}

protected void performService() {
System.out.println(this + " 正在安装轮子");
assembler.car().addWheels();
}
}

/**
* @describe 机器人对象池
*/
class RobotPool {
// 防止出现相同的实体
private Set<Robot> pool = new HashSet<Robot>();

/**
* @descirbe 向连接池中添加一个对象,并且将所有占用当前连接池的锁的线程唤醒
* @author Hope6537(赵鹏)
* @param r
* @signDate 2014年8月11日下午2:48:04
* @version 0.9
*/
public synchronized void add(Robot r) {
pool.add(r);
notifyAll();
}

/**
* @descirbe 雇佣一个机器人
*/
public synchronized void hire(Class<? extends Robot> robotType, Assembler d)
throws InterruptedException {
for (Robot r : pool)
if (r.getClass().equals(robotType)) {
// 遍历并寻找 然后弹出,送出去
pool.remove(r);
r.assignAssembler(d);
r.engage(); // 然后激活他以准备工作
return;
}
wait(); // 如果现在没有,那就挂起等待,直到add方法被调用,说明
// 1、添加了新机器人 2、之前的机器人返回
hire(robotType, d); // 然后再遍历查找一次 依次递归,直到出现一个
}

/**
* @descirbe 机器人回家
*/
public synchronized void release(Robot r) {
add(r);
}
}
注释已经将大部分写的很明白了。

我们看到,将Car对象所有的方法都设置成synchronized的了。这是为什么呢?在工厂的内部,他是按照队列来移动的。队列是安全的啊?

但是如果我们不对Car类同步进行优化,那么当这个系统连接到另一个需要Car被同步的系统时,他就会崩溃

九、性能调优

1、比较各类互斥技术

Bruce给出了一个看起来客观的测试类

package org.hope6537.thinking_in_java.twenty_one;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.*;

/**
* @describe 采用模板设计方法,将公用代码全部放在抽象类里,只有不同的地方才使用抽象方法,其他的统一标准
* @author Hope6537(赵鹏)
* @signdate 2014年8月11日下午8:28:38
* @version 0.9
* @company Changchun University&SHXT
*/
abstract class Accumulator {
public static long cycles = 50000L;
private static final int N = 4;
public static ExecutorService exec = Executors.newFixedThreadPool(N * 2);
private static CyclicBarrier barrier = new CyclicBarrier(N * 2 + 1);
protected volatile int index = 0;
protected volatile long value = 0;
protected long duration = 0;
protected String id = "error";
protected final static int SIZE = 100000;
protected static int[] preLoaded = new int[SIZE];
static {
// 载入随机数数组
Random rand = new Random(47);
for (int i = 0; i < SIZE; i++)
preLoaded[i] = rand.nextInt();
}

/**
* @descirbe 将会实现互斥对象
* @author Hope6537(赵鹏)
* @signDate 2014年8月11日下午8:22:41
* @version 0.9
*/
public abstract void accumulate();

/**
* @descirbe 实现互斥对象的第二种形式
* @author Hope6537(赵鹏)
* @return
* @signDate 2014年8月11日下午8:22:54
* @version 0.9
*/
public abstract long read();

// 进行长循环,同时将任务并行进行
private class Modifier implements Runnable {
public void run() {
for (long i = 0; i < cycles; i++)
accumulate();
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private class Reader implements Runnable {
private volatile long value;

public void run() {
for (long i = 0; i < cycles; i++)
// 读取value
value = read();
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public void timedTest() {
// 时间测试
long start = System.nanoTime();
for (int i = 0; i < N; i++) {
exec.execute(new Modifier());
exec.execute(new Reader());
}
// 通过两个互斥操作和任务并行操作,来观察时间
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
duration = System.nanoTime() - start;
System.out.printf("%-13s: %13d\n", id, duration);
}

public static void report(Accumulator acc1, Accumulator acc2) {
System.out.printf("%-22s: %.2f\n", acc1.id + "/" + acc2.id,
(double) acc1.duration / (double) acc2.duration);
}
}

class BaseLine extends Accumulator {
{
id = "BaseLine";
}

public void accumulate() {
int i = index++;
if (i >= SIZE) {
index = 0;
i = 0;
}
// +随机数
value += preLoaded[i];
}

public long read() {
return value;
}
}

/**
* @describe 采用锁的方式进行测试
* @author Hope6537(赵鹏)
* @signdate 2014年8月11日下午8:26:11
* @version 0.9
* @company Changchun University&SHXT
*/
class SynchronizedTest extends Accumulator {
{
id = "synchronized";
}

public synchronized void accumulate() {
value += preLoaded[index++];
if (index >= SIZE)
index = 0;
}

public synchronized long read() {
return value;
}
}

/**
* @describe 采用显式Lock的方式进行测试
* @author Hope6537(赵鹏)
* @signdate 2014年8月11日下午8:26:19
* @version 0.9
* @company Changchun University&SHXT
*/
class LockTest extends Accumulator {
{
id = "Lock";
}
private Lock lock = new ReentrantLock();

public void accumulate() {
lock.lock();
try {
value += preLoaded[index++];
if (index >= SIZE)
index = 0;
} finally {
lock.unlock();
}
}

public long read() {
lock.lock();
try {
return value;
} finally {
lock.unlock();
}
}
}

/**
* @describe 使用原子类进行测试
* @author Hope6537(赵鹏)
* @signdate 2014年8月11日下午8:26:32
* @version 0.9
* @company Changchun University&SHXT
*/
class AtomicTest extends Accumulator {
{
id = "Atomic";
}
private AtomicInteger index = new AtomicInteger(0);
private AtomicLong value = new AtomicLong(0);

/*
* public void accumulate() { // Oops! Relying on more than one Atomic at //
* a time doesn't work. But it still gives us // a performance indicator:
* int i = index.getAndIncrement(); value.getAndAdd(preLoaded[i]); if (++i
* >= SIZE) index.set(0); }
*/

public void accumulate() {
int i = index.getAndIncrement();
if (i >= SIZE) {
i = 0;
index.set(0);
}
value.getAndAdd(preLoaded[i]);
}

public long read() {
return value.get();
}
}

public class SynchronizationComparisons {
// 获取测试类对象
static BaseLine baseLine = new BaseLine();
static SynchronizedTest synch = new SynchronizedTest();
static LockTest lock = new LockTest();
static AtomicTest atomic = new AtomicTest();

static void test() {
// 拉取界面
System.out.println("============================");
System.out.printf("%-12s : %13d\n", "Cycles", Accumulator.cycles);
// 同时时间测试
baseLine.timedTest();
synch.timedTest();
lock.timedTest();
atomic.timedTest();
// 然后最后将测试结果打印出来
Accumulator.report(synch, baseLine);
Accumulator.report(lock, baseLine);
Accumulator.report(atomic, baseLine);
Accumulator.report(synch, lock);
Accumulator.report(synch, atomic);
Accumulator.report(lock, atomic);
}

public static void main(String[] args) {
// 测试次数
int iterations = 5;
if (args.length > 0)
iterations = new Integer(args[0]);
// 第一次测试作为热身,将会使线程池填满
System.out.println("Warmup");
baseLine.timedTest();
// 现在测试的时间将不会包括创造线程的时间
// Produce multiple data points: yeah~
for (int i = 0; i < iterations; i++) {
test();
Accumulator.cycles *= 2;
}
Accumulator.exec.shutdown();
}

}

/*
* Output: (Sample) Warmup BaseLine : 34237033 ============================
* Cycles : 50000 BaseLine : 20966632 synchronized : 24326555 Lock : 53669950
* Atomic : 30552487 synchronized/BaseLine : 1.16 Lock/BaseLine : 2.56
* Atomic/BaseLine : 1.46 synchronized/Lock : 0.45 synchronized/Atomic : 0.79
* Lock/Atomic : 1.76 ============================ Cycles : 100000 BaseLine :
* 41512818 synchronized : 43843003 Lock : 87430386 Atomic : 51892350
* synchronized/BaseLine : 1.06 Lock/BaseLine : 2.11 Atomic/BaseLine : 1.25
* synchronized/Lock : 0.50 synchronized/Atomic : 0.84 Lock/Atomic : 1.68
* ============================ Cycles : 200000 BaseLine : 80176670 synchronized
* : 5455046661 Lock : 177686829 Atomic : 101789194 synchronized/BaseLine :
* 68.04 Lock/BaseLine : 2.22 Atomic/BaseLine : 1.27 synchronized/Lock : 30.70
* synchronized/Atomic : 53.59 Lock/Atomic : 1.75 ============================
* Cycles : 400000 BaseLine : 160383513 synchronized : 780052493 Lock :
* 362187652 Atomic : 202030984 synchronized/BaseLine : 4.86 Lock/BaseLine :
* 2.26 Atomic/BaseLine : 1.26 synchronized/Lock : 2.15 synchronized/Atomic :
* 3.86 Lock/Atomic : 1.79 ============================ Cycles : 800000 BaseLine
* : 322064955 synchronized : 336155014 Lock : 704615531 Atomic : 393231542
* synchronized/BaseLine : 1.04 Lock/BaseLine : 2.19 Atomic/BaseLine : 1.22
* synchronized/Lock : 0.47 synchronized/Atomic : 0.85 Lock/Atomic : 1.79
* ============================ Cycles : 1600000 BaseLine : 650004120
* synchronized : 52235762925 Lock : 1419602771 Atomic : 796950171
* synchronized/BaseLine : 80.36 Lock/BaseLine : 2.18 Atomic/BaseLine : 1.23
* synchronized/Lock : 36.80 synchronized/Atomic : 65.54 Lock/Atomic : 1.78
* ============================ Cycles : 3200000 BaseLine : 1285664519
* synchronized : 96336767661 Lock : 2846988654 Atomic : 1590545726
* synchronized/BaseLine : 74.93 Lock/BaseLine : 2.21 Atomic/BaseLine : 1.24
* synchronized/Lock : 33.84 synchronized/Atomic : 60.57 Lock/Atomic : 1.79
*/// :~
通过输出我们明显看到,使用Lock明显比synchronized高效率的多。而且synchronized的开销看起来范围很大,而Lock则一般。

但是synchronized的可读性更强,所以先从synchronized入手编写,再在性能调优的时候用Lock改进更明智些。

2、免锁容器

十、活动对象

十一、总结
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发