您的位置:首页 > 编程语言 > Java开发

java多线程设计模

2016-02-19 22:03 603 查看
Edit


一、基本介绍

一个线程的作用是为了能够执行一段程序。而多线程的目的是为了能够提高程序的吞吐率。
在单核CPU情况下,多线程通过线程的交替执行,能够避免单个线程情况下由于资源等待而处于“假死”状态,导致吞吐率低下。

在多核CPU情况下,多线程能够充分利用多个CPU可以同时计算的优势,提高程序执行效率。
这本书主要通过类图、流程图的方式介绍线程的设计模式。 

首先是一些基本概念。 

然后涉及到一些有代表性的设计模式。


二、几个重要的关键字

synchronize、wait、notify、join


1. synchronize关键

不同线程访问同一个方法或者同一块区域时,能够保证一次只有一个线程访问。 

用法:


1) 临界方法。

用于方法前,说明这个方法同一时间只能由一个线程访问。注意,若一个类中有多个synchronize方法,那么多个 线程同一时间只能访问一个对象里面的一个synchronize方法。即方法的synchronize是对象级锁。 

如下图,当一个线程调用了bank对象的deposit()方法而未返回时,其他任何调用该bank对象synchronize方法的线程都会等待(不仅仅是deposit方法、withdraw方法也会卡住)。 




2) 临界区。

void method (){
synchronize( obj ) { ...}
}

这种方式能够保证当不同线程进入该方法的同一个代码块时是互斥的。
若synchronized 的参数是this,当线程进入该临界区时,会将整个对象锁住,而导致任何要调用该对象synchronize方法或者进入临界区的线程都会等待。

若synchronize的参数是某个对象,
void method(){
Object lock;
synchronized(o){...}
}

则意味着任何调用进入该临界区的线程都会获取对象lock的锁,任何需要获取lock对象锁的线程都会等待。

然而这个没什么卵用,因为线程A的lock是局部变量,其他线程B若进入该临界区,会获取新创建的lock对象的锁,而并非A的lock变量的锁,所以起不到控制的作用。正确的做法应该将lock对象作为一个各个线程共享的对象,这样,若A线程获取lock的锁,则其他要获取lock对象锁的线程就只能等待了。 

例如,将使用静态变量,这样,由于所有线程访问的lock对象都是同一个,所以,能够保证临界区的互斥。
class MyThread implements Runnable {
int n;
static final Object lock = new Object();

public MyThread(int n) {
this.n = n;
}

public void method(){
synchronized (lock) {
...
}
}

@Override
public void run() {
...
}
}



2. wait /notify/notifyAll 方法

用途:当某线程想要获取的资源不满足要求时,线程自发进入等待状态。

为什么要有wait?若没有wait,那么,当线程等待的资源不满足要求时,该线程需要不断地通过while 循环来查询资源的状态,这会浪费大量CPU资源。

通过 wait /notify的方式,当线程发现等待的资源不满足时,将调用该线程(或者某对象)的wait方法,自发的放弃CPU进行等待队列,而当某个正在执行的进程发现资源状态能够满足时,便调用线程的(或者某对象的)notify/notifyAll方法来唤醒等待队列中的线程。

notify vs notifyAll 

notify只会随机唤醒waitset中的一个线程,而notifyAll会唤醒所有的线程(太多了怕出乱子?只要写好进入获取临界资源的方法将好了,不满足要求的线程仍然会乖乖进入wait set里的),由于notify只会唤醒一个线程,若这个线程没处理好导致资源一直处于不可用状态,那么,其他程序可能将会一直处于wait set中而无法执行了,所以使用notifyAll并写好边界条件的程序更靠谱些。

注意
wait、notify,必须在synchronized中调用。
调用wait、notify前,必须已经获得对象的锁。


3. notice

long 与double 不是原子的,所以是线程不安全的,所以,需要中synchronized中操作,或者声明为volatile。


三、一些主要的模式


1. Guarded Suspension Pattern

该模式中,有一个具有状态的对象,这个对象只有中自己状态合适的时候,才让线程进行目的处理。通过警戒条件来控制线程对临界资源的使用。 

其实这个就是多线程最普遍的写法

synchronized(this){
while(!obj.isready()){
wait();
}
//do something
}

通过while循环检测资源是否能够使用,若可用,则跳出while循环,若不能,则wait,直到其他线程调用了notify后,再次判断该资源是否可用,重复以上步骤。

为什么要使用while而不是if? 

若其他线程调用的时notifyAll,则会唤醒所有的线程,若使用if,则所有的线程都会被允许使用该资源,然而这个资源可能已经被第一个唤醒的资源使用过而变成不可用了(比如,从只有一个元素的队列中poll一个元素,然后队列就空了),这样就导致后续线程对资源的使用失去了控制。

理解这个模式,关键要清楚实例的警戒状态 ,线程之所以需要wait,是因为资源的状态不符合,即警戒状态不成立,所以,程序要能够往下走,一定是因为警戒条件成立(而不仅仅是因为notify的调用),这也更好的说明了为什么使用while而不是if。


2.Producer-Consumer Pattern

现在是一个经典模式,生产者-消费者模式。 

需要的对象:
一个用于共享的对象,例如可放若干水果的餐盘、一个放消息的队列等(称为共享区),可用list、数组或map等实现。
生产商品的线程,该线程持续不断的将物品放入共享区,例如不断地往餐盘中放水果、往消息队列中放消息,直到共享区满了。
消费商品的线程,该线程不断从共享区中获取物品,例如不断地从餐盘中拿水果、从消息队列中取消息,直接共享区没有物品。 

为了防止当共享区没有物品时,消费者仍从共享区中获取物品、或者共享区满了生产者还往里放物品,共享区需要进行“自我保护”,防止在不合适的状态下消费者或者生产者对他的操作。


共享区

共享区会维护一个固定大小的空间,并提供add 和take的方法,简单实现如下:

package thread;

public class ShareAre {
final int CAPITAL = 10;
Object[] arr = new Object[CAPITAL];
int head = 0, count = 0;

public synchronized void put(Object obj) {
while (count >= CAPITAL) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
arr[(head + count) % CAPITAL] = obj;
count++;
notifyAll();
}

public synchronized Object take() {
Object obj = null;
while (count <= 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
obj = arr[head];
head = (head + 1) % CAPITAL;
count--;
notifyAll();
return obj;
}
}

生产者和消费者的示例代码如下:

package thread;

public class Producer_Custom1 {
final int MAX_SIZE = 10;
volatile int count = 0;

public static void main(String[] args) {
new Producer_Custom1().start();
}

private void start() {
ShareAre box = new ShareAre();
new Thread(new Producer(box)).start();
new Thread(new Producer(box)).start();
new Thread(new Customer(box)).start();
new Thread(new Customer(box)).start();
}

class Producer implements Runnable {
ShareAre box;

public Producer(ShareAre box) {
this.box = box;
}

@Override
public void run() {
// 此处未对count进行保护,所以不能保证count递增,即便count为volatile
while (count < 100) {
count++;
System.out.println("produce:" + count);
box.put(new Integer(count));
}

}
}

class Customer implements Runnable {
ShareAre box;

public Customer(ShareAre box) {
this.box = box;
}

@Override
public void run() {
while (true) {
System.out.println("consume: " + box.take());
}
}
}
}


volatile关键字特性
可见性:对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
有序性:禁止指令重排序。

以上示例由于对ShareArea的put 和take进行了synchronize的保护,所以,这是一个线程安全的类。若将该类的synchronize关键字去除,并删除wait、notifyAll的相关代码,使得ShareArea成为一个线程不安全的类,则需要在生产者和消费者的线程中针对具体情况进行同步。此处将不多言了。 

可见,生产者消费者模式,主要通过提供线程安全的共享区域,来简化多线程中对于处理数据的添加和删除的需求。java中也提供了对应的线程安全的数据处理的类,如:Vector、Stack、HashTable、StringBuffer。同时,针对线程不安全的类(LinkedList、HashMap…),可以通过

List list = Collections.synchronizedList(new LinkedList(...));
Map map = Collections.synchronizedMap(new HashMap(...));

使其变成线程安全的类。原理嘛,将是使用了个委托模式,在方法前加了个synchronize。


3.Worker Thread——工作!

消息委托的模式,将上一节的生产者、消费者模式进行了一定的转化: 

生产者 –> 命令生产者(委托人) 

共享区域中的对象 –> 可执行的命令(Request) 

消费者 –> 命令执行者(工人) 

那么,就能够按照如下方式进行:生产者生成一系列需要执行的命令,将命令放入共享区域中,命令执行者则从共享区中获取命令执行。

优点:1)这种模式将委托者从创建命令和执行命令中分离,让命令的执行无需占用委托者的CPU资源。2)通过增加工人的数量,则可以提高并发处理的效率。3)命令的产生和执行相分离。

命令的产生invocation 和执行execution分离的意义?
提高任务的相应,委托者中创建完任务后可以立即执行下一个任务,而无需等待命令执行完毕。
分离后,可以根据自身需要,控制任务的执行顺序、设立优先级,比如,将小任务先执行。
可取消和可重复执行,就是字面意思了。
分离后,可分发到多台服务器上执行,提高效率。

缺点:只能在不需要返回值的情况下使用,并且,执行顺序难以保证。

适用: 
最经典的用法是UI页面中,点击一个后台需要长时间才能相应的按钮时,不会导致UI页面卡死(java中使用ActionListener 方式实现)。
在需要进行IO时,将耗时的IO任务从主线程中剥离开。


4.Read Write Lock——读写锁

针对某些特定情况(读多写少),可使用读写锁进行性能的优化。考虑到互斥的操作是比较耗时的,所以在程序中,应该做到只对必要之处进行互斥,而读写的情况就是可优化的方案。 

读操作并不会破坏数据,可以多个线程同时进行,而写操作必须保证线程间的互斥。所以,可以设计一种锁,当某线程获取读锁时,其他线程能够读但不能写;获取写锁时,其他线程则不能读也不能写。这样,在大量读的情况下,能够有效提高效率。 

一个读写锁,若无其他线程进行写操作,则可获得读锁;只有无其他线程在读且无其他线程中写,才可以获得写锁。 

可以用如下方式自己实现一个读写锁

package thread.readwritelock;
public class MyReadWriteLocker {
int readThreads = 0, writeThreads = 0;
public synchronized void readLock() throws InterruptedException {
while (writeThreads > 0) {
wait();
}
readThreads++;
}
public synchronized void readUnlock() {
readThreads--;
notifyAll();
}
public synchronized void writeLock() throws InterruptedException {
while (writeThreads > 0 || readThreads > 0) {
wait();
}
writeThreads++;
}
public synchronized void writeUnlock() {
writeThreads--;
notifyAll();
}
}

一个使用读写锁进行并发控制的对象

package thread.readwritelock;

public class RWData {
MyReadWriteLocker myLock = new MyReadWriteLocker();
String str = "11";

public void read(String s) {
try {
myLock.readLock();
System.out.println("read:" + s + str);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.readUnlock();
}
}

public void write(String s) {
try {
myLock.writeLock();
str += "1";
System.out.println("write:" + s + str);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.writeUnlock();
}
}

}

多个读写线程

package thread.readwritelock;

public class Main {

private void start() {
RWData data = new RWData();
new ReadThread(data, "A").start();
new ReadThread(data, "B").start();
new ReadThread(data, "C").start();
new WriteThread(data, "D").start();
}

public static void main(String[] args) {
new Main().start();
}

class ReadThread extends Thread {
int count = 10;
RWData data;
String name;

public ReadThread(RWData data, String name) {
this.data = data;
this.name = name;
}

@Override
public void run() {
while (count > 0) {
data.read(name);
count--;
}
}
}

class WriteThread extends Thread {
int count = 10;
RWData data;
String name;

public WriteThread(RWData data, String name) {
this.data = data;
this.name = name;
}

@Override
public void run() {
while (count > 0) {
data.write(name);
count--;
}
}
}
}

使用读写锁时,多个读线程并不会互斥,所以可以任意个线程同时读,所以,读的效率很高。 

但以上这种简单的方式,如果不断有读线程进入,那么,可能会导致写线程一直等待读线程释放锁而处于“饥饿”状态。 

java中提供了ReentrantReadWriteLock,可以对读和写操作分别进行加锁,同时避免了由于大量读而导致写线程的饥饿状态。


ReentrantReadWriteLock——可重入读写锁。

可重入: 即同一个线程可对进行多次加锁(同一线程外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响),不支持可重入的锁在重复加锁的情况下看能会产生死锁。重入锁的方案一般是在线程加锁的时候进行计数+1,在线程释放时将计数-1,当计数为0时,才真正释放锁对象。
公平: 
非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。
公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。 

有兴趣的朋友可以自行研究,不多说了。


5.Future

Future有点像蛋糕店给你的提货单,制作完蛋糕后,你可以通过这个提货单来取蛋糕,这样,你可以去做其他事情而不必中蛋糕房里等待。 

写多线程任务时,你也可以参考以上方式,当你的线程A中需要获取一个比较耗时的数据,而你又不愿意让线程A循环等待这个数据,那么,可以在request这个数据时,先返回一个只包含了空变量的Future对象,同时建立一个新线程去获取数据并装入Future对象中(setData),当线程A需要Future对象中的数据时,可以通过getData获取数据(若未准备好,则等待)


1) Future的实现如下:

主函数主要负责调用Client发起请求,并使用返回的数据。

public class Application {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
//这里会立即返回,因为获取的是FutureData,而非RealData
Data data = client.request("name");
//这里可以用一个sleep代替对其他业务逻辑的处理
//在处理这些业务逻辑过程中,RealData也正在创建,从而充分了利用等待时间
Thread.sleep(2000);
//使用真实数据
System.out.println("数据="+data.getResult());
}
}

无论是FutureData还是RealData都实现该接口。

public interface Data {
String getResult() throws InterruptedException;
}

Client主要完成的功能包括:1. 返回一个FutureData;2.开启一个线程用于构造RealData。

public class Client {
public Data request(final String string) {
final FutureData futureData = new FutureData();

new Thread(new Runnable() {
@Override
public void run() {
//RealData的构建很慢,所以放在单独的线程中运行
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();

return futureData; //先直接返回FutureData
}
}

RealData是最终需要使用的数据,它的构造函数很慢。

public class RealData implements Data {
protected String data;

public RealData(String data) {
//利用sleep方法来表示RealData构造过程是非常缓慢的
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}

@Override
public String getResult() {
return data;
}
}

FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程。

//FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程
public class FutureData implements Data {
RealData realData = null; //FutureData是RealData的封装
boolean isReady = false;  //是否已经准备好

public synchronized void setRealData(RealData realData) {
if(isReady)
return;
this.realData = realData;
isReady = true;
notifyAll(); //RealData已经被注入到FutureData中了,通知getResult()方法
}

@Override
public synchronized String getResult() throws InterruptedException {
if(!isReady) {
wait(); //一直等到RealData注入到FutureData中
}
return realData.getResult();
}
}



2)java中对Future的支持

java线程的实现,一般是通过集成Thread类或者实现runnable接口来实现,但这类线程无法获取线程执行完毕返回的对象(当然,你可以通过全局变量或者传入一个对象来获取,但为了确保获取到的数据是完整的,你又需要写对应的控制语句),而java提供了现有的实现方式
建立一个线程池 ExecutorService executor = Executors.newCachedThreadPool();
建立一个实现了Callable接口的对象 Task task = new Task();
通过executor执行线程 Future result = executor.submit(task);
获取对象 result.get(),若对象未准备完毕,线程会等待。 

很简单,代码如下

package thread.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();

try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

System.out.println("主线程在执行任务");

try {
System.out.println("task运行结果" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

System.out.println("所有任务执行完毕");
}
}

class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
System.out.println("子线程计算完毕");
return sum;
}
}

Future模式应用
异步计算:将分解后的多个计算过程分别交给线程池去执行,然后收集各个计算结果进行整理。
处理一些耗时的操作,如上述示例。


6.Termination——优雅的去结束一个线程

当调用一个线程的interrupt( )方法时,若该线程处于sleep 、wait或者join时,会抛出InterruptedException异常,来终止该线程。但要确保一个线程能够被终止(尤其是中循环中),还需要设置一个终止条件(每次循环时判断一下),来避免当线程并未处于上述三种状态时,interrupt不会抛出异常而导致终止失效的问题。

stop方法来终止线程? 

之所以不提倡stop方法,因为虽然它确实停止了一个正在运行的线程,然而,这种方法是不安全也是不受提倡的,因为他强迫线程停止,而可能会导致线程任务未执行完毕将意外退出,而导致数据丢失、对象处于不一致的状态下… 而使用interrupt由于有异常捕获机制,我们能够通过异常处理来处理终止的线程,这种方式是可控的。

join 

由于线程之间是独立的,当线程A中start了线程B(B较耗时),之后,A线程会在B线程之前就结束。于是,当你希望A能够等待B结束之后再结束,将可以中start线程B之后,再调用B线程的join方法,这样,将OK了。并且,线程A会在调用join之后就停住,直到线程B结束之后线程A才会往下走。

package thread.join;

public class Test {

public static void main(String[] args) {
Thread t = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sub thread end.");
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main thread end.");
}

}

原理:当调用线程B的join时,实际上调用的是B线程的wait,然后线程A进入等待队列,直到B执行完毕后,A才继续往下走。 

所以,你也可以如下实现线程的join。

package thread.join;

public class Test {

public static void main(String[] args) {
Thread t = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sub thread end.");
}
});
t.start();
synchronized (t) {
try {
t.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("main thread end.");
}

}



小知识:神奇的ThreadLocal——线程的保险

一个能够管理多个对象的线程。通过set(obj )方法可以添加一个属于该线程的对象,通过get( )方法可以获取调用该方法的线程对应的对象。就像是一个神奇的储物柜入口(get方法),每个人(调用get的线程)在用自己钥匙打开时,都能够进入自己专属的储物柜,请注意,而入口是一样的(都是ThreadLocal对象)。

其实,使用任何一个Collection都可以管理多个对象,但为什么不用(比如map)?map也能够帮你实现委托,为每个对象开辟一个单独的空间,在需要时获取相应的操作。但ThreadLocal是一个线程,他有自己的优势
通过get方法,可以自动找到调用线程对应的单独的对象(空间),想象下如果通过map,必须为每个获取每个线程的ID然后map.get( id )…(其实,ThreadLocal的内部就是维护了一个map,所做的将是把map包装中一个线程中 )
由于它是单独的一个线程,在主线程委托某个任务时,不会占用调用线程的资源(单独的一个线程执行任务)。我觉得这是与直接使用map的最根本区别。

多线程的同步机制通过精密的控制来保证多个线程在访问同一个对象时的正确性,而ThreadLocal从另一个角度来解决多线程的并发问题。前者是用时间换空间(资源竞争会耗时),而后者则使用空间换时间(通过为每个线程提供一个单独的副本避免冲突)。 

所以,用“线程的保险箱”这个词来描述他,无比合适。 

下面来看一个hibernate中典型的ThreadLocal的应用:

private static final ThreadLocal threadSession = new ThreadLocal();

public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (s == null) {
s = getSessionFactory().openSession();
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}

通过使用ThreadLocal来维护线程每个线程的Session,这样,线程能够方便的获取属于自己的session,连参数都不用传,是不是很方便。
参考: 

《java多线程设计模式》 

Java并发编程:volatile关键字解析
http://www.cnblogs.com/dolphin0520/p/3920373.html
转 Java多线程(十)之ReentrantReadWriteLock深入分析 
http://my.oschina.net/adan1/blog/158107
Java多线程中join方法的理解 
http://uule.iteye.com/blog/1101994
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: