您的位置:首页 > 其它

-信号量(Semaphore)在生产者和消费者模式的使用

2014-03-10 14:03 274 查看
转自: /content/501988.html

Semaphore 信号量,就是一个允许实现设置好的令牌。也许有1个,也许有10个或更多。

谁拿到令牌(acquire)就可以去执行了,如果没有令牌则需要等待。

执行完毕,一定要归还(release)令牌,否则令牌会被很快用光,别的线程就无法获得令牌而执行下去了。

请仔细体会里面关于仓库的处理,

1 是如何保证入库时,如果仓库满就等待,

2 出库时,如果仓库无货就等待的。

3 以及对仓库只有10个库位的处理。

4 对同步问题的处理。

[java] view
plaincopy

import java.util.concurrent.Semaphore;

/**

* 老紫竹JAVA提高教程-信号量(Semaphore)的使用。<br>

* 生产者和消费者的例子,库存的管理。

*

* @author 老紫竹(java2000.net,laozizhu.com)

*/

public class TestSemaphore {

public static void main(String[] args) {

// 启动线程

for (int i = 0; i <= 3; i++) {

// 生产者

new Thread(new Producer()).start();

// 消费者

new Thread(new Consumer()).start();

}

}

// 仓库

static Warehouse buffer = new Warehouse();

// 生产者,负责增加

static class Producer implements Runnable {

static int num = 1;

@Override

public void run() {

int n = num++;

while (true) {

try {

buffer.put(n);

System.out.println(">" + n);

// 速度较快。休息10毫秒

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

// 消费者,负责减少

static class Consumer implements Runnable {

@Override

public void run() {

while (true) {

try {

System.out.println("<" + buffer.take());

// 速度较慢,休息1000毫秒

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

/**

* 仓库

*

* @author 老紫竹(laozizhu.com)

*/

static class Warehouse {

// 非满锁

final Semaphore notFull = new Semaphore(10);

// 非空锁

final Semaphore notEmpty = new Semaphore(0);

// 核心锁

final Semaphore mutex = new Semaphore(1);

// 库存容量

final Object[] items = new Object[10];

int putptr, takeptr, count;

/**

* 把商品放入仓库.<br>

*

* @param x

* @throws InterruptedException

*/

public void put(Object x) throws InterruptedException {

// 保证非满

notFull.acquire();

// 保证不冲突

mutex.acquire();

try {

// 增加库存

items[putptr] = x;

if (++putptr == items.length)

putptr = 0;

++count;

} finally {

// 退出核心区

mutex.release();

// 增加非空信号量,允许获取商品

notEmpty.release();

}

}

/**

* 从仓库获取商品

*

* @return

* @throws InterruptedException

*/

public Object take() throws InterruptedException {

// 保证非空

notEmpty.acquire();

// 核心区

mutex.acquire();

try {

// 减少库存

Object x = items[takeptr];

if (++takeptr == items.length)

takeptr = 0;

--count;

return x;

} finally {

// 退出核心区

mutex.release();

// 增加非满的信号量,允许加入商品

notFull.release();

}

}

}

}

更多0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: