您的位置:首页 > 编程语言 > Java开发

控制并发访问资源 -- Semaphore

2017-08-18 00:00 507 查看

一、概述

Semaphore(信号量)是一个控制访问多个共享资源的计数器。

当一个线程想要访问某个共享资源,首先,它必须获得 semaphore。如果 semaphore的内部计数器的值大于0,那么 semaphore减少计数器的值并允许访问共享的资源。计数器的值大于0表示,有可以自由使用的资源,所以线程可以访问并使用它们。

另一种情况,如果 semaphore的计数器的值等于0,那么 semaphore让线程进入休眠状态一直到计数器大于0。计数器的值等于0表示全部的共享资源都正被线程们使用,所以此线程想要访问就必须等到某个资源成为自由的。

当线程使用完共享资源时,他必须放出 semaphore为了让其他线程可以访问共享资源。这个操作会增加 semaphore 的内部计数器的值。

示例说明:

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,单纯从字面上很难理解 Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

二、主要方法

void acquire():从信号量获取一个许可,获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。在提供一个许可前一直将线程阻塞,否则线程被中断。

  void release():释放一个许可,将其返回给信号量,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。然后针对线程安排目的启用(或再启用)该线程。

  int availablePermits():返回此信号量中当前可用的许可数。

  boolean hasQueuedThreads():查询是否有线程正在等待获取。

三、实现

eg1: semaphores是用来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行。

import java.util.concurrent.Semaphore;

public class PrintQueue {

private Semaphore semaphore;

public PrintQueue(){
semaphore = new Semaphore(1);
}

//模拟打印文档
public void printJob (Object document){
try {
semaphore.acquire();
//实现能随机等待一段时间的模拟打印文档的行。
long duration = 2000;
System.out.println("PrintQueue: Printing a Job:" + Thread.currentThread().getName());
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release();
}
}
}

public class Task implements Runnable{

private PrintQueue printQueue;

public Task(PrintQueue printQueue){
this.printQueue = printQueue;
}

public void run() {
System.out.println("Going to print a job:" + Thread. currentThread().getName());
printQueue.printJob(new Object());
System.out.println("The document has been printed:" + Thread.currentThread().getName());
}

public static void main(String[] args) {
PrintQueue pq = new PrintQueue();
Thread[] threads = new Thread[10];
for(int i=0;i<threads.length;i++){
threads[i] = new Thread(new Task(pq));
}
for(Thread thread : threads){
thread.start();
}
}
}

说明:示例的重点是PrintQueue类的构造方法和初始化Semaphore对象。你传递值1作为此构造方法的参数,那么你就创建了一个binary semaphore。初始值为1,就保护了访问一个共享资源,在例子中是print queue。

当你开始10个threads,当你开始10个threads时,那么第一个获得semaphore的得到critical section的访问权。剩下的线程都会被semaphore阻塞直到那个获得semaphore的线程释放它。当这情况发生,semaphore在等待的线程中选择一个并给予它访问critical section的访问权。全部的任务都会打印文档,只是一个接一个的执行。

eg2:semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段每次可以被多个线程执行。

模拟6辆车去泊车,而车位有2个的场景. 当车位满时,出来一辆车,才能有一辆车进入停车.

import java.util.concurrent.Semaphore;
/**
* 示例模拟6辆车去泊车,而车位有2个的场景. 当车位满时,出来一辆车,才能有一辆车进入停车.
*/
public class ParkTask implements Runnable{

private int carNo;
private Semaphore semaphore;

public ParkTask(int carNo, Semaphore semaphore){
this.carNo = carNo;
this.semaphore = semaphore;
}

public void run() {
try {
semaphore.acquire();
//停车操作
parking();
Thread.sleep(2000);
semaphore.release();
//离开
leaving();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 停车
public void parking(){
System.out.println(String.format("%d号车泊车", carNo));
}
// 离开
public void leaving(){
System.out.println(String.format("%d号车离开", carNo));
}
}

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class ParkingCars {

private static final int CarCount = 6;
private static final int SemaphoreCount = 2;

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(SemaphoreCount, true);

ExecutorService service = Executors.newCachedThreadPool();
for(int carNo=1; carNo<=CarCount; carNo++){
service.execute(new ParkTask(carNo, semaphore));
}

try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
System.out.println(semaphore.availablePermits() + " 个停车位可以用!");
}

}

//console结果:
2号车泊车
1号车泊车
2号车离开
4号车泊车
3号车泊车
1号车离开
0 个停车位可以用!
3号车离开
6号车泊车
5号车泊车
4号车离开
6号车离开
5号车离开

说明:ParkTask 就是线程,停车就是 在执行,离开表示 线程完成,停车中的休眠就表示 堵塞。当信号量都被占用时,其他的线程 只能处于 等待状态,等待被占用的 线程释放 被占用的 信号量。

四、应用场景

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息