您的位置:首页 > 编程语言 > Java开发

Java并发(8):CountDownLatch、CyclicBarrier、Semaphore、Callable、Future

2016-08-12 01:15 876 查看

  CountDownLatch、CyclicBarrier、Semaphore、Callable、Future  都位于java.util.concurrent包下,其中CountDownLatch、CyclicBarrier和Semaphore属于该包中的tools分支,Callable和Future属于该包中的executer分支。

一.CountDownLatch

  CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

CountDownLatch类只提供了一个构造器:

public CountDownLatch(int count) {  };  //参数count为计数值

然后下面这3个方法是CountDownLatch类中最重要的方法:

public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { };  //将count值减1

例子:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个磁盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,实现代码如下:

public class Main {
private static CountDownLatch count = new CountDownLatch(4);
private static ExecutorService service = Executors.newFixedThreadPool(6);
public static void main(String args[]) throws InterruptedException {
for (int i = 0; i < 4; i++) {
int finalI = i;
service.execute(() -> {
// 模拟任务耗时
try {
int timer = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(timer);
System.out.printf("%s时完成磁盘%d的统计任务,耗费%d秒.\n", new Date().toString(), finalI, timer);
// 任务完成之后,计数器减一
count.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 下面语句由主线程调用,故主线程一直被阻塞,直到count的计数器被设置为0
count.await();
System.out.printf("%s时全部任务都完成,执行合并计算.\n", new Date().toString());
service.shutdown();
}
}

运行结果为:

Fri Aug 19 08:59:44 CST 2016时完成磁盘1的统计任务,耗费0秒.
Fri Aug 19 08:59:45 CST 2016时完成磁盘2的统计任务,耗费1秒.
Fri Aug 19 08:59:48 CST 2016时完成磁盘0的统计任务,耗费4秒.
Fri Aug 19 08:59:48 CST 2016时完成磁盘3的统计任务,耗费4秒.
Fri Aug 19 08:59:48 CST 2016时全部任务都完成,执行合并计算.

  当问题已经分解为许多部分,每个线程都被分配一部分计算时,CountdownLatch 非常有用。在工作线程结束时,它们将减少计数,协调线程可以在闩锁处等待当前这一批计算结束,然后继续移至下一批计算。
  相反地,具有计数 1 的 CountdownLatch 类可以用作"启动大门",来立即启动一组线程;工作线程可以在闩锁处等待,协调线程减少计数,从而立即释放所有工作线程。下例使用两个 CountdownLatche。一个作为启动大门,一个在所有工作线程结束时释放线程。

二.CyclicBarrier用法

  字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

  CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

public CyclicBarrier(int parties, Runnable barrierAction) {

}

public CyclicBarrier(int parties) {

}

  参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

  然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:

public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

  第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;

  第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。

  例子:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情,实现代码如下:

public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier  = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000);      //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}

执行结果:

子线程在进行计算
主线程在执行任务
task运行结果4950
所有任务执行完毕
View Code

5.3 Future模式

  Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。

  Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的时一直等待到这个答复收到时再去做别的事情,但如果利用Future设计模式就无需等待答复的到来,在等待答复的过程中可以干其他事情。

  例如如下的请求调用过程时序图。当call请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的Future模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就是先获得一个假数据,然后执行其他操作。

代码实现如下:

Client的实现

Client主要完成的功能包括:1. 返回一个FutureData;2.开启一个线程用于构造RealData。

public class Client {
public Data request(final String string) {
final FutureData futureData = new FutureData();

new Thread(new Runnable() {
@Override
public void run() {
//RealData的构建很慢,所以放在单独的线程中运行
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();

return futureData; //先直接返回FutureData
}
}

Data的实现
无论是FutureData还是RealData都实现该接口。

public interface Data {
String getResult() throws InterruptedException;
}

FutureData的实现
FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程。

//FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程
public class FutureData implements Data {
RealData realData = null; //FutureData是RealData的封装
boolean isReady = false;  //是否已经准备好

public synchronized void setRealData(RealData realData) {
if(isReady)
return;
this.realData = realData;
isReady = true;
notifyAll(); //RealData已经被注入到FutureData中了,通知getResult()方法
}

@Override
public synchronized String getResult() throws InterruptedException {
if(!isReady) {
wait(); //一直等到RealData注入到FutureData中
}
return realData.getResult();
}
}

RealData的实现
RealData是最终需要使用的数据,它的构造函数很慢。

public class RealData implements Data {
protected String data;

public RealData(String data) {
//利用sleep方法来表示RealData构造过程是非常缓慢的
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}

@Override
public String getResult() {
return data;
}
}

测试运行
主函数主要负责调用Client发起请求,并使用返回的数据。

public class Application {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
//这里会立即返回,因为获取的是FutureData,而非RealData
Data data = client.request("name");
//这里可以用一个sleep代替对其他业务逻辑的处理
//在处理这些业务逻辑过程中,RealData也正在创建,从而充分了利用等待时间
Thread.sleep(2000);
//使用真实数据
System.out.println("数据="+data.getResult());
}
}

 

参考:https://www.geek-share.com/detail/2620744100.html

https://www.geek-share.com/detail/2620882102.html

http://www.2cto.com/kf/201411/351903.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: