您的位置:首页 > 编程语言 > Java开发

Java线程(CountDownLatch、CyclicBarrier、Semaphore)并发控制工具类

2016-11-05 19:06 621 查看
尽管线程是比较的神秘且具有一定的不可控性,但我们依然可以尽可能地对其进行管理和“控制”

运用CountDownLatch、CyclicBarrier、Semaphore等 在很大程度上可以帮我们对线程的一些执行顺序等进行管理

线程等待的另一种形式:

java线程中除了使用 Object的wait()与notify()等来进行等待与唤醒外,CountDownLatch在一定程度上能够更方便地进行一个或多个线程的等待操作。

适用背景如,将一个任务裁剪分发给多个工作者进行执行,然后再从多个工作者处汇聚得出总结果。

[ join()方法也能适用上面的例子,join的介绍可见博主的“java线程等待/通知机制及中断”文章,join()可理解为将上面提到的裁剪的零散任务按串行执行等待,而CountDownLatch是并行处理遇到wait后等待,直到countDown为0,才放行]

下面开始介绍CountDownLatch

1、CountDownLatch(int count) 构造接受一个int型的参数作为计数器,每当调用countDown()方法后会使count-1,在CountDownLatch以await()方法阻塞时,,若检查到count达到0,则从其await()返回,运行继续执行之后的步骤操作。CountDownLatch的countDown()方法没有限制使用域(即在任何地方均可调用,若需要在多个线程中调用,只要将本类对象的引用传递进线程即可)注意,CountDownLatch的计数器在初始化时传递进去后就无法更改或重置

2、public void await() throws InterruptedException

使当前线程在锁存器计数值至零之前一直等待,除非线程被中断。 

如果当前计数为零,则此方法立即返回。 

如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态: 

①由于调用 countDown() 方法,计数到达零;②其他某个线程中断当前线程。 

如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断, 

则抛出 InterruptedException,并且清除当前线程的已中断状态。 

另外,可以使用await(long timeout,TimeUnit unit) 设置等待超时,防止一直阻塞

3、①测试例子:

<span style="font-size:14px;">import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountdownlatchTest {
public static void fun1() throws InterruptedException{
CountDownLatch aa=new CountDownLatch(2);
System.out.println("0");
aa.countDown();
long time1=System.currentTimeMillis();
aa.await(5, TimeUnit.SECONDS);
long time2=System.currentTimeMillis()-time1;
System.out.println("1");
System.out.println("-----"+time2+"毫秒-----");
}
public static void fun2() throws InterruptedException{
CountDownLatch aa=new CountDownLatch(1);
System.out.println("2");
aa.countDown();
aa.await();
System.out.println("3");
aa.await();
System.out.println("4");
System.out.println("-----------");
}
public static void main(String[] args) throws InterruptedException {
/*fun2()由于设置了计数器为1,
* countDown之后,
* 即便多次await()也不会再阻塞*/
fun2();
/*fun1()由于只调用了一次countDown(),
* 会一直阻塞到5秒超时,
* 然后从await()返回再继续执行*/
fun1();
}
}</span>
运行结果:

<span style="font-size:14px;">2
3
4
-----------
0
1
-----5001毫秒-----</span>

②用法:

第一个对象实例是一个启动信号,在 driver 为继续执行 worker 做好准备之前,它会阻止所有的 worker 继续执行。 

第二个对象实例是一个完成信号,它允许 driver 在完成所有 worker 之前一直等待。

class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();

doSomethingElse();            // don't let run yet
startSignal.countDown();      // let all threads proceed
doSomethingElse();
doneSignal.await();           // wait for all to finish
}
}

class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}


下面开始介绍CyclicBarrier

它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用

CyclicBarrier(int parties)


          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)都执行等待状态时启动(即所有线程都调用await达到屏障)。
CyclicBarrier(int parties,Runnable barrierAction)


          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)都执行等待状态时启动,并在解除等待前先执行给定的屏障操作(barrierAction的run())
验证:

import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());

public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {

}
System.out.println(1);
}
}).start();

try {
c.await();
} catch (Exception e) {

}
System.out.println(2);
}
static class A implements Runnable  {
@Override
public void run() {
try {
Thread.sleep(2000);//睡眠2秒,验证是否A优先执行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(3);
}
}
}
运行结果:3 1 2
或 3 2 1 (即便使A睡眠再久 也是A结束后再从屏障返回)


Semaphore:

Semaphore用于控制同时访问特定资源的线程数量。比如在频繁地操作时可以有千百个线程进行执行操作,但其中可能涉及到对数据库的后续存入操作等,在数据库线程池可能只提供了部分连接数。则可以使用Semaphore进行控制。

例:

<span style="font-size:14px;">import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {

private static final int  THREAD_COUNT = 30;

private static ExecutorService threadPool   = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();//若去掉这行,则没有进行归还许可,导致只输出10行数据
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}</span>
分析:30个线程进行执行操作,但Semaphore只允许了10个并发量,通过acquire()方法从其中获取通行证,在使用完毕后,调用release()归还,这样第11个线程才有机会从队列中再获取许可进行后续操作。

使用示例如 线程池A进行任务处理后遇到需要执行数据库操作时使用Semaphore控制并发(数量与数据库线程池B的连接数对应)如:

Thread A.run(){
.....
...
Job aJob=new Job(){....}
Semaphore.acquire()
Feture F=DataThreadPool.submit(aJob)
if(F.get())
Semaphore.release()
....
..
}


暂时先谈这么多咯。。

CountDownLatch 与 CyclicBarrier 简要区别:(再啰嗦几句。。)

CountDownLatch(int N)  N表示计数器值,调用countDown()方法来使N-1,其wait()方法会阻塞当前线程,直到N=0

CyclicBarrier (int parties) parties为屏障拦截的线程数 当有parties个线程都调用了CyclicBarrier的await()以后,才均可放行,否则等待。

[CountDownLatch 只需在任何地方调用N次countDown() 而CyclicBarrier要parties个线程调用它的await()]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐