Java并发(8):CountDownLatch、CyclicBarrier、Semaphore、Callable、Future
CountDownLatch、CyclicBarrier、Semaphore、Callable、Future 都位于java.util.concurrent包下,其中CountDownLatch、CyclicBarrier和Semaphore属于该包中的tools分支,Callable和Future属于该包中的executer分支。
一.CountDownLatch
CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch类只提供了一个构造器:
public CountDownLatch(int count) { }; //参数count为计数值
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void countDown() { }; //将count值减1
例子:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个磁盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,实现代码如下:
public class Main { private static CountDownLatch count = new CountDownLatch(4); private static ExecutorService service = Executors.newFixedThreadPool(6); public static void main(String args[]) throws InterruptedException { for (int i = 0; i < 4; i++) { int finalI = i; service.execute(() -> { // 模拟任务耗时 try { int timer = new Random().nextInt(5); TimeUnit.SECONDS.sleep(timer); System.out.printf("%s时完成磁盘%d的统计任务,耗费%d秒.\n", new Date().toString(), finalI, timer); // 任务完成之后,计数器减一 count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 下面语句由主线程调用,故主线程一直被阻塞,直到count的计数器被设置为0 count.await(); System.out.printf("%s时全部任务都完成,执行合并计算.\n", new Date().toString()); service.shutdown(); } }
运行结果为:
Fri Aug 19 08:59:44 CST 2016时完成磁盘1的统计任务,耗费0秒. Fri Aug 19 08:59:45 CST 2016时完成磁盘2的统计任务,耗费1秒. Fri Aug 19 08:59:48 CST 2016时完成磁盘0的统计任务,耗费4秒. Fri Aug 19 08:59:48 CST 2016时完成磁盘3的统计任务,耗费4秒. Fri Aug 19 08:59:48 CST 2016时全部任务都完成,执行合并计算.
当问题已经分解为许多部分,每个线程都被分配一部分计算时,CountdownLatch 非常有用。在工作线程结束时,它们将减少计数,协调线程可以在闩锁处等待当前这一批计算结束,然后继续移至下一批计算。
相反地,具有计数 1 的 CountdownLatch 类可以用作"启动大门",来立即启动一组线程;工作线程可以在闩锁处等待,协调线程减少计数,从而立即释放所有工作线程。下例使用两个 CountdownLatche。一个作为启动大门,一个在所有工作线程结束时释放线程。
二.CyclicBarrier用法
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
public int await() throws InterruptedException, BrokenBarrierException { }; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
例子:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情,实现代码如下:
public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } }
执行结果:
子线程在进行计算 主线程在执行任务 task运行结果4950 所有任务执行完毕View Code
5.3 Future模式
Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。
Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的时一直等待到这个答复收到时再去做别的事情,但如果利用Future设计模式就无需等待答复的到来,在等待答复的过程中可以干其他事情。
例如如下的请求调用过程时序图。当call请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的Future模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就是先获得一个假数据,然后执行其他操作。
代码实现如下:
Client的实现
Client主要完成的功能包括:1. 返回一个FutureData;2.开启一个线程用于构造RealData。
public class Client { public Data request(final String string) { final FutureData futureData = new FutureData(); new Thread(new Runnable() { @Override public void run() { //RealData的构建很慢,所以放在单独的线程中运行 RealData realData = new RealData(string); futureData.setRealData(realData); } }).start(); return futureData; //先直接返回FutureData } }
Data的实现
无论是FutureData还是RealData都实现该接口。
public interface Data { String getResult() throws InterruptedException; }
FutureData的实现
FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程。
//FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程 public class FutureData implements Data { RealData realData = null; //FutureData是RealData的封装 boolean isReady = false; //是否已经准备好 public synchronized void setRealData(RealData realData) { if(isReady) return; this.realData = realData; isReady = true; notifyAll(); //RealData已经被注入到FutureData中了,通知getResult()方法 } @Override public synchronized String getResult() throws InterruptedException { if(!isReady) { wait(); //一直等到RealData注入到FutureData中 } return realData.getResult(); } }
RealData的实现
RealData是最终需要使用的数据,它的构造函数很慢。
public class RealData implements Data { protected String data; public RealData(String data) { //利用sleep方法来表示RealData构造过程是非常缓慢的 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; } @Override public String getResult() { return data; } }
测试运行
主函数主要负责调用Client发起请求,并使用返回的数据。
public class Application { public static void main(String[] args) throws InterruptedException { Client client = new Client(); //这里会立即返回,因为获取的是FutureData,而非RealData Data data = client.request("name"); //这里可以用一个sleep代替对其他业务逻辑的处理 //在处理这些业务逻辑过程中,RealData也正在创建,从而充分了利用等待时间 Thread.sleep(2000); //使用真实数据 System.out.println("数据="+data.getResult()); } }
参考:https://www.geek-share.com/detail/2620744100.html
- Java并发之CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- java并发之CountDownLatch、Semaphore和CyclicBarrier
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- (转)java并发之CountDownLatch、Semaphore和CyclicBarrier
- Java并发之CountDownLatch、CyclicBarrier和Semaphore
- 14、Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java 并发之 CountDownLatch、CyclicBarrier 和 Semaphore
- Java并发编程深入学习——CountDownLatch、CyclicBarrier和Semaphore
- JAVA并发编程-障碍器CyclicBarrier,计数器CountDownLatch,信号量Semaphore
- Java笔记:CountDownLatch - 计数锁存器、Future、CyclicBarrier - 循环屏障 和 Semaphore - 信号量
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发: CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发之CountDownLatch、CyclicBarrier和Semaphore
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger