您的位置:首页 > 编程语言 > Java开发

[java多线程]-状态依赖

2018-02-27 19:26 337 查看

概述

在单线程中调用某个方法,而方法中依赖对象的状态前置条件没有满足,那么这个条件将无法满足。然而在多线程中,一个线程依赖的对象状态的前置条件没有满足,另一个线程可能修改对象状态,从而使得前置条件满足。

多线程中对于依赖状态的操作一般是前置条件不满足,阻塞线程,当前置条件满足,唤醒线程继续执行。

在生产者-消费者的设计中经常有像BlockedQueue的有界的阻塞队列。我们下面就以有界阻塞队列来讲解状态依赖。

有界阻塞队列中需要有两个接口
put
用于添加元素,
take
接口获取元素处理。而
put
的前置条件为队列未满,
take
的前置条件为队列不为空。

内置条件队列

条件队列是它使得一组线程能够通过某种方式等待特定的条件为真,条件队列中的元素是一个个正在等待前置条件的队列。

每个对象可以作为一个条件队列,而
wait
notify
notifyAll
接口构成了API。

那我们使用内置的条件队列来实现有界阻塞队列。

public class BoundedBlockedQueue<E> {
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;

public BoundedBlockedQueue(Object[] items) {
this.items = items;
}

public synchronized E take() throws InterruptedException {
while (count == 0)
wait();
return doTake();
}

private E doTake() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
notifyAll();
return x;
}

public synchronized void put(E e) throws InterruptedException {
while (count == items.length) {
wait();
}
doPut(e);
}
private void doPut(E e) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notifyAll();
}
}


在条件等待中存在一种重要的三元关系,包括加锁,wait方法和一个条件谓语(条件谓语是某个操作的状态依赖的前提条件,比如有界阻塞队列中,
take
方法的条件谓语就是“缓存不为空”)。在条件谓语中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓语之前必须持有这个锁。锁对象与条件队列对象必须是同一对象。

摘录自《Java并发编程实战》

public synchronized E take() throws InterruptedException {
while (count == 0)
wait();
return doTake();
}


take
方法中,条件谓语是“集合不为空”,进入
synchronized
take
方法获取锁,while检测集合为空,为空就进入等待。这里使用while来检测是有两个原因:

由于添加元素和移除元素都会通知,因此从
wait
中唤醒后,不一定是
take
的前置条件满足了,因此需要再次检测。

由于
wait
会释放锁并且唤醒后会尝试着重新获取锁再进一步操作,当使用
notifyAll
会唤醒同一对象上的
wait
的线程,多个线程时存在竞争,一个线程已经完成了
take
之后,其他线程获取获取锁后需要再次检测,因为此时前置条件已经不满足了。

为什么使用
notifyAll
而不是
notify
呢?

首先解释一下两个API的不同,
notifyAll
是唤醒同一个对象的上的所有线程,而
notify
只是从同一个对象的线程中挑选一个唤醒。再来看一下我们的有界阻塞队列,我们的线程可能在等待
take
的条件谓语集合不为空,也可能在等待
put
的条件谓语集合未满,假如在
take
操作中完成了添加元素之后调用了
notify
,那我们唤醒的可能只是等待
take
条件谓语的线程,导致了
put
一直在等待。因此需要使用
notifyAll


那什么时候使用
notify
呢?

只用满足以下两个条件时,才能使用单一的
notify
而不是
notifyAll


所有的等待线程的类型相同,只有一个条件谓语与条件队列相关,并且每个线程在从
wait
返回后将执行相同的操作。

单进单出,在条件变量上的每次通知,最多只能唤醒一个线程来执行。

内置条件队列有它的缺陷,每个内置锁只能有一个相关联的条件队列,像有界阻塞队列中
take
put
两个不同条件谓语只使用同一个条件队列,导致了notifyAll方法通知会唤醒所有的线程,效率很低。

后面我们将会使用显示条件队列来解决内置条件的问题。

显示条件队列

Condition
是广义的显示条件队列。

public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}


与内置条件队列相同,一个条件队列需要并且只能与一个内置锁关联在一起,一个
Condition
也需要和一个
Lock
关联。条件谓语,
Lock
Condition
也存在三元关系,条件谓语中包含的状态变量必须
Lock
来保护,并且在检测条件谓语以及调用
await
signal
时,必须持有
Lock


与内置条件队列不同的是,对于一个
Lock
,可以有任意数量的
Condition
对象,通过
Lock
newCondition()
来创建
Condition
对象。

Condition
await
signal
signalAll
分别与
Object
对象的
wait
notify
notifyAll
对应。

有些Lock并不支持Condition,需要留意。

public class ConditionBoundedBlockedQueue<E> {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

public ConditionBoundedBlockedQueue(int capacity) {
this.items = new Object[capacity];
//创建锁,并从锁中创建条件队列。
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return doTake();
} finally {
lock.unlock();
}
}

private E doTake() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal();
return x;
}

public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
doPut(e);
} finally {
lock.unlock();
}
}
private void doPut(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
}


take
put
的两个条件谓语分开并且放到两个等待线程集中,
Condition
使其更满足单次通知的需求。
signal
signalAll
更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁的请求次数。

Notice: java并发库里面的阻塞队列基本是使用显示阻塞队列来设计的。通常我们不需要使用自定义的阻塞队列,java的并发库已经有了。



何时使用Condition

内置锁与显示锁选择一样,只有用到
Condition
的高级特性的时候才会使用它,包括公平性,同一个锁上多个等待线程集(也就是有多个条件谓语的时候),没有用到高级特性时候,优先选择内置条件队列。

参考资料

《Java并发编程实战》

《Java并发编程-设计原则与模式》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息