Java笔记:CountDownLatch - 计数锁存器、Future、CyclicBarrier - 循环屏障 和 Semaphore - 信号量
2015-09-15 16:54
891 查看
1.CountDownLatch -- 锁存器 有时在线程开发中遇到一些问题,如主线程启动了多个子线程,主线程需要在子线程都结束后再做一些处理,也就是说,主线程必须知道所有子线程都结束的时候。刚开始的时候自己写一个子线程列表,启动一个子线程,加1,结束一个子线程,减1,主线程不断循环等待,当子线程列表归零时就说明所有子线程都结束了。简单的任务还可以勉强使用,但大量是用线程池的时候,发现不靠谱了,研究发现,原来jdk中已经有了该工具类--CountDownLatch。jdk文档:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化
实例:
实例:
实例:
CountDownLatch。由于调用了
countDown()方法,所以在当前计数到达零之前,
await方法会一直受阻塞。之后,会释放所有等待的线程,
await的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用
CyclicBarrier。
CountDownLatch是一个通用同步工具,它有很多用途。将计数 1 初始化的
CountDownLatch用作一个简单的开/关锁存器,或入口:在通过调用
countDown()的线程打开入口前,所有调用
await的线程都一直在入口处等待。用N 初始化的
CountDownLatch可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
CountDownLatch的一个有用特性是,它不要求调用
countDown方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个
await。
构造方法摘要 |
---|
CountDownLatch(int count)构造一个用给定计数初始化的 CountDownLatch。 |
方法摘要 | |
---|---|
await()使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。 | |
await(long timeout,TimeUnit unit)使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。 | |
countDown()递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 | |
getCount()返回当前计数。 | |
toString()返回标识此锁存器及其状态的字符串。 |
public class Test {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3); //三个个工人的协作Worker worker1 = new Worker("张三", 4000, latch);Worker worker2 = new Worker("李四", 2000, latch);Worker worker3 = new Worker("王五", 5000, latch);worker1.start();worker2.start();worker3.start();// 主线程阻塞,等待所有子线程完成(调用latch.countDown())latch.await();System.out.println("主线程:工作完成");}}/*** 工人类-子线程*/class Worker extends Thread{private String name; // 工人姓名private long time; // 工作时间(单位:毫秒)private CountDownLatch latch; // 计数锁存器public Worker(String name, long time, CountDownLatch latch) {this.name = name;this.time = time;this.latch = latch;}private void doWork() throws InterruptedException {Thread.sleep(time);}public void run() {try {doWork(); // 工作中。。。System.out.println("工人: " + name + " 完成工作");} catch (InterruptedException e) {System.out.println("工人: " + name + " 工作出现意外");} finally {latch.countDown(); //工人完成工作,计数器减一}}}运行结果:2.Future但有时发现CountDownLatch只知道子线程的完成情况是不够的,如果在子线程完成后获取其计算的结果,那CountDownLatch就有些捉襟见衬了,所以jdk提供的Future类,不仅可以在子线程完成后收集其结果,还可以设定子线程的超时时间,避免主任务一直等待。
方法摘要 | |
---|---|
cancel(boolean mayInterruptIfRunning)试图取消对此任务的执行。 | |
V | get()如有必要,等待计算完成,然后获取其结果。 |
V | get(long timeout,TimeUnit unit)如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 |
isCancelled()如果在任务正常完成前将其取消,则返回true。 | |
isDone()如果任务已完成,则返回 true。 |
import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class Test {private static final ExecutorService executor = Executors.newCachedThreadPool();private static Random random = new Random();private static long timeout = 4L;/*** 启动多个子任务*/public static void startMoreTask() {List<Callable<Integer>> subTasks = new ArrayList<Callable<Integer>>(); // 子任务集合List<Integer> subTaskResult = new ArrayList<Integer>(); // 子任务的返回集合// 1.初始化10个子任务for (int i = 1; i <= 10; i ++) {SubTask subTask = new SubTask("子线程-" + i, random.nextInt(10)); // 子线程随机在10秒内完成subTasks.add(subTask);}// 2.执行所有的子任务try {List<Future<Integer>> futures = executor.invokeAll(subTasks);for (Future<Integer> future : futures) {try {Integer result = future.get(timeout, TimeUnit.SECONDS); // 设置每个子任务的执行时间不得超过4秒subTaskResult.add(result);} catch (ExecutionException | TimeoutException e) {future.cancel(true); // 当出现执行异常和超时异常时,终止该子任务}}} catch (InterruptedException e) {System.out.println("任务执行异常:" + e.getMessage());}}public static void main(String[] args) {// subTask1测试超时的情况SubTask subTask1 = new SubTask("子线程 - 1", 10);Future<Integer> future1 = executor.submit(subTask1);Integer result1 = null;try {result1 = future1.get(5, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过5秒} catch (InterruptedException e) {System.out.println("线程中断出错");future1.cancel(true);// 中断执行此任务的线程} catch (ExecutionException e) {System.out.println("线程服务出错");future1.cancel(true);// 中断执行此任务的线程} catch (TimeoutException e) {// 超时异常System.out.println("线程执行超时");future1.cancel(true);// 中断执行此任务的线程}System.out.println("subTask1运行结果:" + (result1 == null ? "null" : result1));// subTask2测试拿到子线程返回结果的情况SubTask subTask2 = new SubTask("子线程 - 2", 5);Future<Integer> future2 = executor.submit(subTask2);Integer result2 = null;try {result2 = future2.get(10, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过10秒} catch (InterruptedException e) {System.out.println("线程中断出错");future2.cancel(true);// 中断执行此任务的线程} catch (ExecutionException e) {System.out.println("线程服务出错");future2.cancel(true);// 中断执行此任务的线程} catch (TimeoutException e) {// 超时异常System.out.println("线程执行超时");future2.cancel(true);// 中断执行此任务的线程}System.out.println("subTask2运行结果:" + (result2 == null ? "null" : result2));}}class SubTask implements Callable<Integer> {private String name; // 子线程名private int second; // 子线程完成需要的时间(秒)public SubTask (String name, int second) {this.name = name;this.second = second;}@Overridepublic Integer call() throws Exception {System.out.println("#子线程-" + name + " 开始");Thread.sleep(second * 1000L);System.out.println("#子线程-" + name + " 结束,耗时秒数: " + second);return second;}}运行结果:3.CyclicBarrier -- 循环屏障 之后又发现一个非常好用的多线程辅助类--CyclicBarrier,和CountDownLatch类似,不过适用场景不同。jdk文档: 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
构造方法摘要 |
---|
CyclicBarrier(int parties)创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 |
CyclicBarrier(int parties,Runnable barrierAction)创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 |
方法摘要 | |
---|---|
await()在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。 | |
await(long timeout,TimeUnit unit)在所有参与者都已经在此屏障上调用await方法之前将一直等待,或者超出了指定的等待时间。 | |
getNumberWaiting()返回当前在屏障处等待的参与者数目。 | |
getParties()返回要求启动此 barrier 的参与者数目。 | |
isBroken()查询此屏障是否处于损坏状态。 | |
reset()将屏障重置为其初始状态。 |
import java.util.Random;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;public class CyclicBarrierTest {private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}});public static void main(String[] args) throws Exception {// 初始化CyclicBarrier//CyclicBarrier barrier = new CyclicBarrier(10); // 1.可以只传入一个参数:给定数量的参与者(线程)CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() { // 2.也可以传入第二个参数,当所有的子线程到达某个公共屏障点,进入barrier线程@Overridepublic void run() {System.out.println("#所有任务初始化完成");}});// 使用线程池for (int i = 1; i <= barrier.getParties(); i ++) { // getParties() 返回要求启动此 barrier 的参与者数目String name = "子任务-" + i;executor.submit(new Task(name, barrier));}// 启动所有子线程executor.shutdown();Thread.sleep(Integer.MAX_VALUE);}}/*** 子任务:初始化、执行*/class Task implements Runnable {private String name;private CyclicBarrier barrier;public Task (String name, CyclicBarrier barrier) {this.name = name;this.barrier = barrier;}@Overridepublic void run() {try {// 子任务初始化int sleepTime = new Random().nextInt(10);Thread.sleep(sleepTime * 1000L);System.out.println("子任务:" + this.name + " 初始化完成,使用" + sleepTime + "秒,此时已经初始化完成的子任务数量:" + barrier.getNumberWaiting()); // getNumberWaiting() 返回当前在屏障处等待的参与者数目// 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间barrier.await(); // 也可以换为await(long timeout, TimeUnit unit) 方法设定超时时间// 子任务执行System.out.println("子任务:" + this.name + " 执行完毕");} catch (Exception e) {}}}运行结果:这里需要解释一下屏障点,屏障点就是某种状态,上面程序的屏障点就是所有子线程都调用了await()方法。JDK文档中await()方法的屏障点还有其它情况:在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:最后一个线程到达;或者其他某个线程中断当前线程;或者其他某个线程中断另一个等待线程;或者其他某个线程在等待 barrier 时超时;或者其他某个线程在此 barrier 上调用
reset()。如果当前线程:在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断则抛出
InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被
reset(),或者在调用 await 时 barrier被损坏,抑或任意一个线程正处于等待状态,则抛出
BrokenBarrierException异常。如果任何线程在等待时被中断,则其他所有等待线程都将抛出
BrokenBarrierException异常,并将 barrier 置于损坏状态。如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。返回:到达的当前线程的索引,其中,索引
getParties()- 1 指示将到达的第一个线程,零指示最后一个到达的线程抛出:
InterruptedException- 如果当前线程在等待时被中断
BrokenBarrierException- 如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用
await时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。4.Semaphore -- 信号量 Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。它可以控同时访问的线程个数,通过acquire() 获取一个许可,如果没有就等待,而release()释放一个许可。 Semaphore类提供了2个构造器:
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可sync = (fair)? new FairSync(permits) : new NonfairSync(permits);Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:
public void acquire() throws InterruptedException { } //获取一个许可public void acquire(int permits) throws InterruptedException { } //获取permits个许可public void release() { } //释放一个许可public void release(int permits) { } //释放permits个许可acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。 release()用来释放许可。注意,在释放许可之前,必须先获获得许可。 这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回falsepublic boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回falsepublic boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回falsepublic boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false实例:
import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.ThreadFactory;public class SemaphoreTest {private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}});public static void main(String[] args) throws Exception {// 假定有10个工人,5个机器,1个功能需要1个机器才能工作int workerNum = 10;int machineNum = 5;// 工人占用机器期间其它工人无法使用该机器,直到该机器被释放Semaphore semaphore = new Semaphore(machineNum);// 开始工作for (int i = 1; i <= workerNum; i ++) {String name = "工人-" + i;executor.submit(new Worker(name, semaphore));}// 启动所有子线程executor.shutdown();Thread.sleep(Integer.MAX_VALUE);}}/*** 工人类,一个工人需要一个机器才能工作*/class Worker implements Runnable {private String name;private Semaphore semaphore;public Worker(String name, Semaphore semaphore) {this.name = name;this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire(); // 阻塞,等待一个许可System.out.println("工人:" + this.name + " 占用机器生产ing");int sleepTime = new Random().nextInt(10);Thread.sleep(sleepTime * 1000L);System.out.println("工人:" + this.name + " 占用机器生产 " + sleepTime + " 秒后释放机器");semaphore.release();} catch (Exception e) { }}}运行结果:总结,CountDownLatch、CyclicBarrier和Semaphore的区别和使用场景1.CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同: (1)CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行; (2)而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行; (3)CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。2.Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。参考:jdk文档java 多线程 CountDownLatch用法Java并发编程:Callable、Future和FutureTaskjava Future 接口介绍Java并发编程:CountDownLatch、CyclicBarrier和SemaphoreJava并发之CountDownLatch、CyclicBarrier和SemaphoreCyclicBarrier介绍Java之CyclicBarrier使用
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树
- [原创]java局域网聊天系统