您的位置:首页 > 编程语言 > Java开发

通过生产者消费者问题比较信号量和信号【Java实现】

2014-08-25 13:40 501 查看
最近在看操作系统相关的内容,看到了进程通信这一块时,发现自己被信号量和信号给弄糊涂了。于是我仔细查了查资料,发现自己以前完全没弄清楚这两个概念……现在我把自己理解的东西总结一下,并用Java代码实现,给大家分享一下,如有问题,欢迎指出。

首先看信号量(Semaphore)。信号量是多线程环境下使用的一种方式,可以用来保证两个或多个程序不能同时进入临界区,从而不能同时放一个共享资源,达到进程互斥的作用。此外,通过使用信号量,我们也可以实现进程同步:在生产者消费者问题中,信号量full和empty用来保证某种事件顺序发生或者不发生,即对于一个物品,生产的过程始终要在消费的过程之前。Java的concurrent包已经实现了信号量,我们现在用它来实现生产者消费者问题,代码如下:

import java.util.concurrent.Semaphore;

///article/7757404.html
public class TestSemaphore {

static WareHouse wareHouse = new WareHouse();

//生产者
static class Producer implements Runnable {
static int num = 1;
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
wareHouse.insert(num);
System.out.println("生产物品" + num);
num++;
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

}

//消费者
static class Consumer implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
System.out.println("消费物品" + wareHouse.remove());
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

}

//仓库,可以放置和拿走物品
static class WareHouse {
private final int capacity = 10;
private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量
private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量
private final Semaphore mutex = new Semaphore(1); //互斥信号量
private int insertIndex = 0; //仓库中当前可以放置物品的位置
private int removeIndex = 0; //仓库中当前可以拿走物品的位置
private final Object[] items = new Object[capacity]; //仓库中的所有物品
int count = 0; //仓库中的现有物品数

//向仓库中放置物品
public void insert(Object item) throws InterruptedException {
empty.acquire();
mutex.acquire();
items[insertIndex++] = item;
if (insertIndex == capacity) {
insertIndex = 0;
}
count++;
mutex.release();
full.release();
}

//从仓库中拿走物品
public Object remove() throws InterruptedException {
full.acquire();
mutex.acquire();
Object item = items[removeIndex++];
if (removeIndex == capacity) {
removeIndex = 0;
}
count--;
mutex.release();
empty.release();
return item;
}
}

public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}

可以看到,该例子启动了一个生产者线程和一个消费者线程,其中生产者线程每隔0.1s生产一个物品,消费者线程每隔0.5s消费一个物品。此外,最重要的是存放物品的这个类WareHouse,里面有三个信号量full,empty和mutex,其中full和empty用来做进程同步,mutex用来做进程互斥。每当要生产一个物品时,我们首先检查能否获取信号量full的一个许可,如果不可以则阻塞线程,如果可以则继续获取items数组的访问权,该访问权由互斥信号量mutex来控制,当放置完物品后,会释放信号量empty的一个许可。同理,每当要消费一个物品时,我们首先检查能否获取信号量empty的一个许可,如果不可以则阻塞线程,如果可以择继续获取items数组的访问权,当拿走物品后,会释放信号量full的一个许可。

这样,我们就用信号量实现了生产者消费者问题,结果如下:

生产物品1
消费物品1
生产物品2
生产物品3
生产物品4
生产物品5
生产物品6
消费物品2
生产物品7
生产物品8
生产物品9
生产物品10
生产物品11
消费物品3
生产物品12
生产物品13
消费物品4
生产物品14
生产物品15
消费物品5
生产物品16
消费物品6
看过了信号量,我们来看看信号。信号(Signal)是一种处理异步事件的通讯方式,用于通知其他进程或者自己本身,来告知将有某种事件发生。在Java中,信号机制通过wait(),notify()和notifyAll()来实现。其中wait()使得当前调用wait()的线程挂起,并释放已经获得的wait()所在代码块的锁;notify()用于随即唤醒一个被wait()挂起的线程进入线程调度队列;notifyAll()用于唤醒所有被wait()挂起的线程进入线程调度队列。
用Java信号实现的生产者和消费问题, 代码如下:

public class TestSignal {
///article/7757404.html
static Monitor monitor = new Monitor();

//生产者
static class Producer implements Runnable {
static int num = 1;
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
monitor.insert(num);
System.out.println("生产物品" + num);
num++;
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

}

//消费者
static class Consumer implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
System.out.println("消费物品" + monitor.remove());
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

}

//管程,只能有一个线程占用
static class Monitor {
private final int capacity = 10;
private int insertIndex = 0; //仓库中当前可以放置物品的位置
private int removeIndex = 0; //仓库中当前可以拿走物品的位置
private final Object[] items = new Object[capacity]; //仓库中的所有物品
int count = 0; //仓库中的现有物品数

//向仓库中放置物品
public synchronized void insert(Object item) throws InterruptedException {
//当仓库已满时,挂起生产线程
if (count == capacity) {
wait();
}
items[insertIndex++] = item;
if (insertIndex == capacity) {
insertIndex = 0;
}
count++;
//当仓库由空变为不空时,唤起消费线程
if (count == 1) {
notify();
}
}

//从仓库中拿走物品
public synchronized Object remove() throws InterruptedException {
//当仓库没有物品时,挂起消费线程
if (count == 0) {
wait();
}
Object item = items[removeIndex++];
if (removeIndex == capacity) {
removeIndex = 0;
}
count--;
//当仓库由满变为不满时,唤起生产线程
if (count == capacity - 1) {
notify();
}
return item;
}

}

public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}

可以看到,该例子使用一个Monitor类来实现仓库中放置和拿走物品的方法,该类相当于一个管程,只能保证同一时刻只能有一个线程使用该类。具体实现是采用静态类和synchronized方法,保证当insert()调用时拥有Monitor类的类锁,使得remove()无法获得Monitor类的类锁,同理,保证当remove()调用时拥有Monitor类的类锁,使得remove()无法被调用。里面实现的巧妙之处在于,当count为capacity时,我们会挂起生产进程,当count从capacity变为capacity
- 1时,就会唤醒生产进程加入线程调度队列,同理,当count为0时,我们会挂起消费进程,当count从0变为1时,就会唤醒消费进程加入线程调度队列。

这种方式的执行结果,与信号量的结果类似,不再列出来。

看完了这两个例子,我们对信号量和信号有了一定的了解。但我们依然不清楚的是:它们的区别到底在哪里呢?

如果我们单单使用信号,即使用wait和notify方法,很有可能会错过丢失某些信号通知。比如,如果我们不对count的访问添加限制,当count为0时,调度程序知道仓库里没有物品了,便准备挂起消费者线程并启动生产者线程,如果生产者线程的启动时间比消费者线程的挂起时间快得多,很可能会有这种情况:当生产者线程生产了一个物品,使count由0变为1,此时会向消费者线程发送一个唤醒信号,但此时消费者线程还没有完全挂起,因此它会忽略这个信号,这样一来,生产者线程会一直生产物品,直到count为capacity时挂起,而消费者线程在完全挂起之后不会再收到唤醒信号,因此也会一直挂起,这样整个系统就全部挂起,永远不会执行下去。当然,我们的实现方式是同一时刻只能有一个线程操作count,因此消费者线程的wait方法一定在生产者线程的notify方法之前执行,即当消费者线程完全挂起之后,生产者线程才能启动,于是不会出现错过丢失信号的问题。

而Java中Semaphore的实现也是建立在信号的基础上的,但不同的是它会保存所有的信号,不会丢弃任何信号,这样就很好地避免了前面的问题,这也是信号量和信号的最大区别所在。

转载请注明出处:/article/7757404.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: