您的位置:首页 > 编程语言 > Java开发

Java并发编程--深入理解Semaphore

2017-06-06 16:03 239 查看

Semaphore简介

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore的用途

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。

自定义简单Semaphore类

下面是一个自定义的简单semaphore类。

public class Semaphore {
private boolean signal = false;

public synchronized void take() {
this.signal = true;
this.notify();
}

public synchronized void release() throws InterruptedException{
while(!this.signal) wait();
this.signal = false;
}

}


上面的take()方法相当于Object中的notify()方法,用于发送一个信号量,而release()方法相当于Object中的wait()方法,用于接收一个信号量,因此可以用Semaphore类来代替Object的notify()和wait()。

下面是两个进程通过semaphore类通信的实例。

public class SemaphoreTest{

public static void main(String args[])
{
Semaphore semaphore = new Semaphore();

SendingThread sender = new SendingThread(semaphore);

ReceivingThread receiver = new ReceivingThread(semaphore);

receiver.start();

sender.start();
}
}

//@1 SendingThread

public class SendingThread extends Thread{
Semaphore semaphore = null;

public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}

public void run(){
while(true){
//do something, then signal
this.semaphore.take();

}
}
}

//@2 RecevingThread
public class RecevingThread extends Thread{
Semaphore semaphore = null;

public ReceivingThread(Semaphore semaphore){
this.semaphore = semaphore;
}

public void run(){
while(true){
this.semaphore.release();
//receive signal, then do something...
}
}
}


Counting Semaphore

前面实现的简单Semaphore类没有计算take()方法中signals的数量, 下面实现计算signal数量的Semaphore的类,叫做 Counting Semaphore。

public class CountingSemaphore {
private int signals = 0;

public synchronized void take() {
this.signals++;
this.notify();
}

public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
}

}


Bounded Semaphore

Bounded Semaphore用来限制signal的个数,类似于队列的数量。

public class BoundedSemaphore {
private int signals = 0;
private int bound   = 0;

public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}

public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signals++;
this.notify();
}

public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}


用Semaphores 实现 Locks

BoundedSemaphore semaphore = new BoundedSemaphore(1);

...

semaphore.take();

try{
//critical section
} finally {
semaphore.release();
}


参考文献

http://tutorials.jenkov.com/java-concurrency/semaphores.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发 semaphore