您的位置:首页 > 其它

控制并发线程数的Semaphore和线程之间的数据交换Exchanger

2017-05-06 11:28 671 查看
控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

可以这样来形容,比如:xx马路要限制车流量,只允许同时又一百辆车在这条路上行驶,其他的都必须等在路口,所以前一百辆会看到绿灯,可以开进马路,后面的车会看到红灯,不能开进该马路。但是如果如果前一百辆中有5辆驶离了马路,那么后面就允许有5辆驶入马路。这个例子中的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

1、应用场景

Semaphore可以用于做流量控制,特别是公共资源有限的应用场景。比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

public class SemaphoreTest {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
for(int i=0;i<THREAD_COUNT;i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();//获取一个许可证
System.out.println("save data...");
System.out.println("..."+s.availablePermits()+"..."+s.getQueueLength()+"..."+s.hasQueuedThreads());
s.release();//归还一个许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}


代码中,虽然有30个线程在执行,但只允许10个并发执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

除此之外,还提供一些其他的方法,具体如下。

int availablePermits():返回此信号量中当前可用的许可证数。

int getQueueLength():返回正在等待获取许可证的线程数。

boolean hasQueuedThreads():是否有线程正在等待获取许可证。

void reducePermits(int reduction):减少reduction个许可证,是个protected方法。

Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。

线程之间的数据交换Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。它用于线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这2个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当2个线程都到达同步点时,这2个线程就可以交换数据,将本线程生产出来的数据传递给对方。

Exchanger的应用场景

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得出2个交配结果。Exchanger也可用于校对工作,比如,我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看是否录入一致。如:

public class ExchangerTest {

private static Exchanger<String> exchanger = new Exchanger<String>();

private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
String A = "数据A";
try {
exchanger.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

threadPool.execute(new Runnable() {
@Override
public void run() {
String B = "数据B";
try {
String A = exchanger.exchange("B");
System.out.println("A和B的数据是否一致:"+A.equals(B)+";A录入的是:"+A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}

}


如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息