Java线程(CountDownLatch、CyclicBarrier、Semaphore)并发控制工具类
2016-11-05 19:06
621 查看
尽管线程是比较的神秘且具有一定的不可控性,但我们依然可以尽可能地对其进行管理和“控制”
运用CountDownLatch、CyclicBarrier、Semaphore等 在很大程度上可以帮我们对线程的一些执行顺序等进行管理
线程等待的另一种形式:
java线程中除了使用 Object的wait()与notify()等来进行等待与唤醒外,CountDownLatch在一定程度上能够更方便地进行一个或多个线程的等待操作。
适用背景如,将一个任务裁剪分发给多个工作者进行执行,然后再从多个工作者处汇聚得出总结果。
[ join()方法也能适用上面的例子,join的介绍可见博主的“java线程等待/通知机制及中断”文章,join()可理解为将上面提到的裁剪的零散任务按串行执行等待,而CountDownLatch是并行处理遇到wait后等待,直到countDown为0,才放行]
下面开始介绍CountDownLatch
1、CountDownLatch(int count) 构造接受一个int型的参数作为计数器,每当调用countDown()方法后会使count-1,在CountDownLatch以await()方法阻塞时,,若检查到count达到0,则从其await()返回,运行继续执行之后的步骤操作。CountDownLatch的countDown()方法没有限制使用域(即在任何地方均可调用,若需要在多个线程中调用,只要将本类对象的引用传递进线程即可)注意,CountDownLatch的计数器在初始化时传递进去后就无法更改或重置
2、public void await() throws InterruptedException
使当前线程在锁存器计数值至零之前一直等待,除非线程被中断。
如果当前计数为零,则此方法立即返回。
如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态:
①由于调用 countDown() 方法,计数到达零;②其他某个线程中断当前线程。
如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,
则抛出 InterruptedException,并且清除当前线程的已中断状态。
另外,可以使用await(long timeout,TimeUnit unit) 设置等待超时,防止一直阻塞
3、①测试例子:
②用法:
第一个对象实例是一个启动信号,在 driver 为继续执行 worker 做好准备之前,它会阻止所有的 worker 继续执行。
第二个对象实例是一个完成信号,它允许 driver 在完成所有 worker 之前一直等待。
下面开始介绍CyclicBarrier
它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用
验证:
或 3 2 1 (即便使A睡眠再久 也是A结束后再从屏障返回)
Semaphore:
Semaphore用于控制同时访问特定资源的线程数量。比如在频繁地操作时可以有千百个线程进行执行操作,但其中可能涉及到对数据库的后续存入操作等,在数据库线程池可能只提供了部分连接数。则可以使用Semaphore进行控制。
例:
使用示例如 线程池A进行任务处理后遇到需要执行数据库操作时使用Semaphore控制并发(数量与数据库线程池B的连接数对应)如:
暂时先谈这么多咯。。
CountDownLatch 与 CyclicBarrier 简要区别:(再啰嗦几句。。)
CountDownLatch(int N) N表示计数器值,调用countDown()方法来使N-1,其wait()方法会阻塞当前线程,直到N=0
CyclicBarrier (int parties) parties为屏障拦截的线程数 当有parties个线程都调用了CyclicBarrier的await()以后,才均可放行,否则等待。
[CountDownLatch 只需在任何地方调用N次countDown() 而CyclicBarrier要parties个线程调用它的await()]
运用CountDownLatch、CyclicBarrier、Semaphore等 在很大程度上可以帮我们对线程的一些执行顺序等进行管理
线程等待的另一种形式:
java线程中除了使用 Object的wait()与notify()等来进行等待与唤醒外,CountDownLatch在一定程度上能够更方便地进行一个或多个线程的等待操作。
适用背景如,将一个任务裁剪分发给多个工作者进行执行,然后再从多个工作者处汇聚得出总结果。
[ join()方法也能适用上面的例子,join的介绍可见博主的“java线程等待/通知机制及中断”文章,join()可理解为将上面提到的裁剪的零散任务按串行执行等待,而CountDownLatch是并行处理遇到wait后等待,直到countDown为0,才放行]
下面开始介绍CountDownLatch
1、CountDownLatch(int count) 构造接受一个int型的参数作为计数器,每当调用countDown()方法后会使count-1,在CountDownLatch以await()方法阻塞时,,若检查到count达到0,则从其await()返回,运行继续执行之后的步骤操作。CountDownLatch的countDown()方法没有限制使用域(即在任何地方均可调用,若需要在多个线程中调用,只要将本类对象的引用传递进线程即可)注意,CountDownLatch的计数器在初始化时传递进去后就无法更改或重置
2、public void await() throws InterruptedException
使当前线程在锁存器计数值至零之前一直等待,除非线程被中断。
如果当前计数为零,则此方法立即返回。
如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态:
①由于调用 countDown() 方法,计数到达零;②其他某个线程中断当前线程。
如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,
则抛出 InterruptedException,并且清除当前线程的已中断状态。
另外,可以使用await(long timeout,TimeUnit unit) 设置等待超时,防止一直阻塞
3、①测试例子:
<span style="font-size:14px;">import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class CountdownlatchTest { public static void fun1() throws InterruptedException{ CountDownLatch aa=new CountDownLatch(2); System.out.println("0"); aa.countDown(); long time1=System.currentTimeMillis(); aa.await(5, TimeUnit.SECONDS); long time2=System.currentTimeMillis()-time1; System.out.println("1"); System.out.println("-----"+time2+"毫秒-----"); } public static void fun2() throws InterruptedException{ CountDownLatch aa=new CountDownLatch(1); System.out.println("2"); aa.countDown(); aa.await(); System.out.println("3"); aa.await(); System.out.println("4"); System.out.println("-----------"); } public static void main(String[] args) throws InterruptedException { /*fun2()由于设置了计数器为1, * countDown之后, * 即便多次await()也不会再阻塞*/ fun2(); /*fun1()由于只调用了一次countDown(), * 会一直阻塞到5秒超时, * 然后从await()返回再继续执行*/ fun1(); } }</span>运行结果:
<span style="font-size:14px;">2 3 4 ----------- 0 1 -----5001毫秒-----</span>
②用法:
第一个对象实例是一个启动信号,在 driver 为继续执行 worker 做好准备之前,它会阻止所有的 worker 继续执行。
第二个对象实例是一个完成信号,它允许 driver 在完成所有 worker 之前一直等待。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
下面开始介绍CyclicBarrier
它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)都执行等待状态时启动(即所有线程都调用await达到屏障)。 |
CyclicBarrier(int parties,Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)都执行等待状态时启动,并在解除等待前先执行给定的屏障操作(barrierAction的run()) |
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } static class A implements Runnable { @Override public void run() { try { Thread.sleep(2000);//睡眠2秒,验证是否A优先执行 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(3); } } }运行结果:3 1 2
或 3 2 1 (即便使A睡眠再久 也是A结束后再从屏障返回)
Semaphore:
Semaphore用于控制同时访问特定资源的线程数量。比如在频繁地操作时可以有千百个线程进行执行操作,但其中可能涉及到对数据库的后续存入操作等,在数据库线程池可能只提供了部分连接数。则可以使用Semaphore进行控制。
例:
<span style="font-size:14px;">import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release();//若去掉这行,则没有进行归还许可,导致只输出10行数据 } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } }</span>分析:30个线程进行执行操作,但Semaphore只允许了10个并发量,通过acquire()方法从其中获取通行证,在使用完毕后,调用release()归还,这样第11个线程才有机会从队列中再获取许可进行后续操作。
使用示例如 线程池A进行任务处理后遇到需要执行数据库操作时使用Semaphore控制并发(数量与数据库线程池B的连接数对应)如:
Thread A.run(){ ..... ... Job aJob=new Job(){....} Semaphore.acquire() Feture F=DataThreadPool.submit(aJob) if(F.get()) Semaphore.release() .... .. }
暂时先谈这么多咯。。
CountDownLatch 与 CyclicBarrier 简要区别:(再啰嗦几句。。)
CountDownLatch(int N) N表示计数器值,调用countDown()方法来使N-1,其wait()方法会阻塞当前线程,直到N=0
CyclicBarrier (int parties) parties为屏障拦截的线程数 当有parties个线程都调用了CyclicBarrier的await()以后,才均可放行,否则等待。
[CountDownLatch 只需在任何地方调用N次countDown() 而CyclicBarrier要parties个线程调用它的await()]
相关文章推荐
- 四个并发工具类CountDownLatch,CyclicBarrier,Semaphore,Exchanger
- java 1.5 并发流程控制CountDownLatch,CyclicBarrier,Semaphore
- Java中的并发工具类:CountDownLatch、CyclicBarrier和Semaphore
- 并发工具类:CountDownLatch、CyclicBarrier、Semaphore
- Java并发:同步工具类详解(CountDownLatch、CyclicBarrier、Semaphore)
- 第8章 Java中的并发工具类(CountDownLatch CyclicBarrier Semaphore Exchanger)
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- JAVA多线程系列--并发工具类(CountDownLatch, CyclicBarrier, Semaphore,Exchanger)
- 并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)
- Java并发编程深入学习——CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
- Java并发之CountDownLatch、CyclicBarrier和Semaphore
- Java 并发之 CountDownLatch、CyclicBarrier 和 Semaphore
- AbstractQueuedSynchronizer在工具类ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier中的应用
- Java 并发之 CountDownLatch、CyclicBarrier 和 Semaphore
- 14、Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- Java并发(8):CountDownLatch、CyclicBarrier、Semaphore、Callable、Future
- AbstractQueuedSynchronizer在工具类Semaphore、CountDownLatch、ReentrantLock中的应用和CyclicBarrier
- java高并发程序设计总结五:jdk并发包其他同步控制工具类:ReadWriteLock/CountDownLatch/CyclicBarrier/LockSupport
- 并发工具类CountDownLatch、CyclicBarrier、Semaphore、Exchanger详解